5. Организация балансировки вычислительных нод Tegu¶
Организация балансировки вычислительных нод Tegu потребуется вам в случае использования редакции Tegu Enterprise в мультисерверном (кластерном) исполнении. Такая балансировка не имеет прямого отношения к дистрибутиву Tegu т.к. выполняется сетевым оборудованием пользователя.
Однако, нет ничего страшного в случае, если у пользователя нет сетевого оборудования, способного выполнять балансировку трафика т.к. подобное решение можно реализовать стандартными средствами Linux и дополнительным ПО, отслеживающим падение нод.
Одно из таких решений Tiar мы предлагаем в данном разделе. Данный скрипт написан нами на Python для работы с nftables и может быть полезен как сам по себе, так и для понимания принципа балансировки трафика с помощью собственного оборудования пользователя.
#!/usr/bin/env python3
import signal
import socket
import nftables
import time
from systemd.journal import JournalHandler
import logging
log = logging.getLogger('nft_lb')
log.addHandler(JournalHandler())
log.setLevel(logging.INFO)
class ServiceDestination:
def __init__(self, name, dip):
self.name = name
self.dip = dip
self.online = True
class LbService:
def __init__(self, name, vip, vport):
self.name = name
self.vip = vip
self.vport = vport
self.dest = {}
class Event:
def __init__(self):
self.shutdown = False
ev = Event()
lb_map = {}
def create_rules():
nft = nftables.Nftables()
code, _, __ = nft.cmd('flush table ip lb')
if code != 0:
code, _, __ = nft.cmd('create table ip lb')
nft.cmd('add chain ip lb prerouting { type nat hook prerouting priority 0 ; }')
nft.cmd('add chain ip lb forward { type filter hook forward priority -10 ; }')
for svc_name in lb_map.keys():
nft.cmd(f'add chain ip lb dnat_{svc_name}')
nft.cmd(f'add rule ip lb prerouting ip daddr {lb_map[svc_name].vip} tcp dport {lb_map[svc_name].vport} counter jump dnat_{svc_name}')
dest_count = len(lb_map[svc_name].dest)
dest_list = []
for dest_num, dest_name in enumerate(lb_map[svc_name].dest.keys()):
dest_list.append(f'{dest_num} : {lb_map[svc_name].dest[dest_name].dip}')
nft.cmd(f'add rule ip lb forward ip daddr {lb_map[svc_name].dest[dest_name].dip} tcp dport {lb_map[svc_name].vport} counter mark set 333444555 accept comment "[{svc_name}] -> {dest_name}"')
nft.cmd(f'add rule ip lb dnat_{svc_name} counter dnat to numgen inc mod {dest_count} map {{ {", ".join(dest_list)} }}')
def recreate_service(svc_name):
nft = nftables.Nftables()
dest_list = []
online_exists = False
for dest_name in lb_map[svc_name].dest.keys():
if lb_map[svc_name].dest[dest_name].online:
dest_list.append(lb_map[svc_name].dest[dest_name].dip)
online_exists = True
dest_count = len(dest_list)
dest_map_list = []
for num, dest in enumerate(dest_list):
dest_map_list.append(f'{num} : {dest}')
nft.cmd(f'flush chain ip lb dnat_{svc_name}')
if online_exists:
nft.cmd(f'add rule ip lb dnat_{svc_name} counter dnat to numgen inc mod {dest_count} map {{ {", ".join(dest_map_list)} }}')
def shutdown_lb(sig_num, frame):
nft = nftables.Nftables()
nft.cmd('flush table ip lb')
nft.cmd('delete table ip lb')
ev.shutdown = True
def check_backends():
for svc_name in lb_map.keys():
need_recreate = False
for dest_name in lb_map[svc_name].dest.keys():
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
a_socket.settimeout(1)
host = lb_map[svc_name].dest[dest_name].dip
port = lb_map[svc_name].vport
if a_socket.connect_ex((host, port)) != 0:
if lb_map[svc_name].dest[dest_name].online:
log.info(f'[{svc_name}] Node "{dest_name}" offline')
lb_map[svc_name].dest[dest_name].online = False
need_recreate = True
else:
if not lb_map[svc_name].dest[dest_name].online:
log.info(f'[{svc_name}] Node "{dest_name}" online')
lb_map[svc_name].dest[dest_name].online = True
need_recreate = True
a_socket.close()
if need_recreate:
recreate_service(svc_name)
def process_config():
with open('/opt/nft_lb_rules', 'r') as f:
rules_str = f.read()
for rule_line in rules_str.split('\n'):
if rule_line.strip() == '': continue
svc_name, vip, vport, dest_name, dip = rule_line.split('|')
if svc_name not in lb_map:
lb_map[svc_name] = LbService(svc_name, vip, int(vport))
lb_map[svc_name].dest[dest_name] = ServiceDestination(dest_name, dip)
create_rules()
if __name__ == '__main__':
signal.signal(signal.SIGINT, shutdown_lb)
signal.signal(signal.SIGTERM, shutdown_lb)
signal.signal(signal.SIGHUP, shutdown_lb)
process_config()
tik_count = 0
while not ev.shutdown:
if tik_count >= 20:
check_backends()
tik_count = 0
continue
time.sleep(1)
tik_count += 1
Данный скрипт использует список правил из файла /opt/nft_lb_rules в следующем формате:
smtp|1.2.3.4|25|node1|10.1.1.11 smtp|1.2.3.4|25|node2|10.1.1.12 smtp|1.2.3.4|25|node3|10.1.1.13 imap|1.2.3.4|993|node1|10.1.1.11 imap|1.2.3.4|993|node2|10.1.1.12 imap|1.2.3.4|993|node3|10.1.1.13 smtps|1.2.3.4|465|node1|10.1.1.11 smtps|1.2.3.4|465|node2|10.1.1.12 smtps|1.2.3.4|465|node3|10.1.1.13 webadm|1.2.3.4|9999|node1|10.1.1.11 webadm|1.2.3.4|9999|node2|10.1.1.12 webadm|1.2.3.4|9999|node3|10.1.1.13
Если в nftables в основной цепоче forward последним правилом настроено отбрасывание всех внешних пакетов, то необходимо исключить из этого правила пакеты с меткой 333444555:
iifname "eth1" mark != 333444555 counter drop
Для настройки автозапуска скрипта балансировщика, надо создать, включить и запустить сервис systemd:
/etc/systemd/system/nft_lb.service
[Unit] Description=Nftables python balancer After=multi-user.target [Service] Type=simple Restart=always ExecStart=/usr/bin/python3 /opt/nft_lb.py [Install] WantedBy=multi-user.target