Проект

Общее

Профиль

5. Организация балансировки вычислительных нод Tegu

Организация балансировки вычислительных нод Tegu потребуется вам в случае использования редакции Tegu Enterprise в мультисерверном (кластерном) исполнении. Такая балансировка не имеет прямого отношения к дистрибутиву Tegu т.к. выполняется сетевым оборудованием пользователя.

Однако, нет ничего страшного в случае, если у пользователя нет сетевого оборудования, способного выполнять балансировку трафика т.к. подобное решение можно реализовать стандартными средствами Linux и дополнительным ПО, отслеживающим падение нод.

Одно из таких решений Tiar мы предлагаем в данном разделе. Данный скрипт написан нами на Python для работы с nftables и может быть полезен как сам по себе, так и для понимания принципа балансировки трафика с помощью собственного оборудования пользователя.

#!/usr/bin/env python3
import signal
import socket
import nftables
import time
from systemd.journal import JournalHandler
import logging

log = logging.getLogger('nft_lb')
log.addHandler(JournalHandler())
log.setLevel(logging.INFO)

class ServiceDestination:
    def __init__(self, name, dip):
        self.name = name
        self.dip = dip
        self.online = True

class LbService:
    def __init__(self, name, vip, vport):
        self.name = name
        self.vip = vip
        self.vport = vport
        self.dest = {}

class Event:
    def __init__(self):
        self.shutdown = False

ev = Event()

lb_map = {}

def create_rules():
    nft = nftables.Nftables()
    code, _, __ = nft.cmd('flush table ip lb')
    if code != 0:
        code, _, __ = nft.cmd('create table ip lb')
    nft.cmd('add chain ip lb prerouting { type nat hook prerouting priority 0 ; }')
    nft.cmd('add chain ip lb forward { type filter hook forward priority -10 ; }')
    for svc_name in lb_map.keys():
        nft.cmd(f'add chain ip lb dnat_{svc_name}')
        nft.cmd(f'add rule ip lb prerouting ip daddr {lb_map[svc_name].vip} tcp dport {lb_map[svc_name].vport} counter jump dnat_{svc_name}')
        dest_count = len(lb_map[svc_name].dest)
        dest_list = []
        for dest_num, dest_name in enumerate(lb_map[svc_name].dest.keys()):
            dest_list.append(f'{dest_num} : {lb_map[svc_name].dest[dest_name].dip}')
            nft.cmd(f'add rule ip lb forward ip daddr {lb_map[svc_name].dest[dest_name].dip} tcp dport {lb_map[svc_name].vport} counter mark set 333444555 accept comment "[{svc_name}] -> {dest_name}"')
        nft.cmd(f'add rule ip lb dnat_{svc_name} counter dnat to numgen inc mod {dest_count} map {{ {", ".join(dest_list)} }}')

def recreate_service(svc_name):
    nft = nftables.Nftables()
    dest_list = []
    online_exists = False
    for dest_name in lb_map[svc_name].dest.keys():
        if lb_map[svc_name].dest[dest_name].online:
            dest_list.append(lb_map[svc_name].dest[dest_name].dip)
            online_exists = True
    dest_count = len(dest_list)
    dest_map_list = []
    for num, dest in enumerate(dest_list):
      dest_map_list.append(f'{num} : {dest}')
    nft.cmd(f'flush chain ip lb dnat_{svc_name}')
    if online_exists:
        nft.cmd(f'add rule ip lb dnat_{svc_name} counter dnat to numgen inc mod {dest_count} map {{ {", ".join(dest_map_list)} }}')

def shutdown_lb(sig_num, frame):
    nft = nftables.Nftables()
    nft.cmd('flush table ip lb')
    nft.cmd('delete table ip lb')
    ev.shutdown = True

def check_backends():
    for svc_name in lb_map.keys():
        need_recreate = False
        for dest_name in lb_map[svc_name].dest.keys():
            a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            a_socket.settimeout(1)
            host = lb_map[svc_name].dest[dest_name].dip
            port = lb_map[svc_name].vport
            if a_socket.connect_ex((host, port)) != 0:
                if lb_map[svc_name].dest[dest_name].online:
                    log.info(f'[{svc_name}] Node "{dest_name}" offline')
                    lb_map[svc_name].dest[dest_name].online = False
                    need_recreate = True
            else:
                if not lb_map[svc_name].dest[dest_name].online:
                    log.info(f'[{svc_name}] Node "{dest_name}" online')
                    lb_map[svc_name].dest[dest_name].online = True
                    need_recreate = True
            a_socket.close()
        if need_recreate:
            recreate_service(svc_name)

def process_config():
    with open('/opt/nft_lb_rules', 'r') as f:
        rules_str = f.read()
    for rule_line in rules_str.split('\n'):
        if rule_line.strip() == '': continue
        svc_name, vip, vport, dest_name, dip = rule_line.split('|')
        if svc_name not in lb_map:
            lb_map[svc_name] = LbService(svc_name, vip, int(vport))
        lb_map[svc_name].dest[dest_name] = ServiceDestination(dest_name, dip)
        create_rules()
if __name__ == '__main__':
    signal.signal(signal.SIGINT, shutdown_lb)
    signal.signal(signal.SIGTERM, shutdown_lb)
    signal.signal(signal.SIGHUP, shutdown_lb)
    process_config()
    tik_count = 0
    while not ev.shutdown:
        if tik_count >= 20:
            check_backends()
            tik_count = 0
            continue
        time.sleep(1)
        tik_count += 1

Данный скрипт использует список правил из файла /opt/nft_lb_rules в следующем формате:

smtp|1.2.3.4|25|node1|10.1.1.11
smtp|1.2.3.4|25|node2|10.1.1.12
smtp|1.2.3.4|25|node3|10.1.1.13
imap|1.2.3.4|993|node1|10.1.1.11
imap|1.2.3.4|993|node2|10.1.1.12
imap|1.2.3.4|993|node3|10.1.1.13
smtps|1.2.3.4|465|node1|10.1.1.11
smtps|1.2.3.4|465|node2|10.1.1.12
smtps|1.2.3.4|465|node3|10.1.1.13
webadm|1.2.3.4|9999|node1|10.1.1.11
webadm|1.2.3.4|9999|node2|10.1.1.12
webadm|1.2.3.4|9999|node3|10.1.1.13

Если в nftables в основной цепоче forward последним правилом настроено отбрасывание всех внешних пакетов, то необходимо исключить из этого правила пакеты с меткой 333444555:
iifname "eth1" mark != 333444555 counter drop

Для настройки автозапуска скрипта балансировщика, надо создать, включить и запустить сервис systemd:

/etc/systemd/system/nft_lb.service

[Unit]
Description=Nftables python balancer
After=multi-user.target

[Service]
Type=simple
Restart=always
ExecStart=/usr/bin/python3 /opt/nft_lb.py

[Install]
WantedBy=multi-user.target