Use a Reconnecting TCP Client Protocol

Source is https://stackoverflow.com/a/49452683/1504003
with small adaptations (reset delay and retries when we get back a
connection)

Instead of creating the connection in the "main" method, loop and reopen
a new event loop on failures, let the Protocol class handle connections
and retries on failures.

Reduce keepalive interval and store values in attributes. As many
timeouts in systems are 60 seconds, a keepalive of 50 seconds is less
likely to be stopped.

Improve logs: the docstring of connection_lost is:

      The argument is either an exception object or None. The latter means a
      regular EOF is received, or the connection was aborted or closed by this
      side of the connection.

When it receives no exception, it should not raise an error log.
This commit is contained in:
Guewen Baconnier
2020-12-16 16:08:52 +01:00
committed by Hai Lang
parent d4ef72c150
commit 6303714df5

View File

@@ -3,6 +3,7 @@ import argparse
import asyncio import asyncio
import logging import logging
import os import os
import random
import ssl import ssl
import sys import sys
import time import time
@@ -13,8 +14,8 @@ _logger = logging.getLogger(__name__)
class KardexProxyProtocol(asyncio.Protocol): class KardexProxyProtocol(asyncio.Protocol):
def __init__(self, loop, queue, args): def __init__(self, queue, loop, args):
_logger.info("Proxy created") _logger.info("Proxy: created")
self.transport = None self.transport = None
self.buffer = b"" self.buffer = b""
self.queue = queue self.queue = queue
@@ -22,7 +23,7 @@ class KardexProxyProtocol(asyncio.Protocol):
self.args = args self.args = args
def connection_made(self, transport): def connection_made(self, transport):
_logger.info("Proxy incoming cnx") _logger.info("Proxy: incoming cnx made")
self.transport = transport self.transport = transport
self.buffer = b"" self.buffer = b""
@@ -47,29 +48,116 @@ class KardexProxyProtocol(asyncio.Protocol):
self.buffer = b"" self.buffer = b""
def connection_lost(self, exc): def connection_lost(self, exc):
if exc:
_logger.error("Proxy: incoming cnx lost: %s", exc)
else:
_logger.info("Proxy: incoming cnx closed")
self.transport = None self.transport = None
self.buffer = b"" self.buffer = b""
class KardexClientProtocol(asyncio.Protocol): class ReconnectingTCPClientProtocol(asyncio.Protocol):
def __init__(self, loop, queue, args): # source: https://stackoverflow.com/a/49452683/1504003
max_delay = 3600
initial_delay = 1.0
factor = 2.7182818284590451
jitter = 0.119626565582
max_retries = None
def __init__(self, *args, loop=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._args = args
self._kwargs = kwargs
self._retries = 0
self._delay = self.initial_delay
self._continue_trying = True
self._call_handle = None
self._connector = None
def connection_lost(self, exc):
if self._continue_trying:
self.retry()
def connection_failed(self, exc):
if self._continue_trying:
self.retry()
def retry(self):
if not self._continue_trying:
return
self._retries += 1
if self.max_retries is not None and (self._retries > self.max_retries):
self.stop_trying()
return
self._delay = min(self._delay * self.factor, self.max_delay)
if self.jitter:
self._delay = random.normalvariate(self._delay, self._delay * self.jitter)
_logger.info("%s: will retry connection after %ss", self, self._delay)
self._call_handle = self._loop.call_later(self._delay, self.connect)
def connect(self):
if self._connector is None:
self._connector = self._loop.create_task(self._connect())
async def _connect(self):
try:
await self._loop.create_connection(
lambda: self, *self._args, **self._kwargs
)
except Exception as exc:
self._loop.call_soon(self.connection_failed, exc)
else:
self._delay = self.initial_delay
self._retries = 0
finally:
self._connector = None
def stop_trying(self):
if self._call_handle:
self._call_handle.cancel()
self._call_handle = None
self._continue_trying = False
if self._connector is not None:
self._connector.cancel()
self._connector = None
class KardexClientProtocol(ReconnectingTCPClientProtocol):
max_delay = 15
initial_delay = 0.5
factor = 1.7182818284590451
jitter = 0.119626565582
# if we set a number of retries, after N failed
# retries, it will stop the event loop and exit
max_retries = None
initial_keepalive_delay = 20
keepalive_delay = 50
def __init__(self, queue, args, loop, **kwargs):
super().__init__(loop=loop, **kwargs)
_logger.info("started kardex client") _logger.info("started kardex client")
self.loop = loop
self.queue = queue self.queue = queue
self.args = args
self.transport = None self.transport = None
self.buffer = b"" self.buffer = b""
self.args = args
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
_logger.info("connected to kardex server %r", transport) _logger.info("connected to kardex server %r", transport)
async def keepalive(self): async def keepalive(self):
await asyncio.sleep(self.initial_keepalive_delay)
while True: while True:
t = int(time.time()) t = int(time.time())
msg = "61|ping%d|SH1-1|0|0||||||||\r\n" % t msg = "61|ping%d|SH1-1|0|0||||||||\r\n" % t
await self.send_message(msg) await self.send_message(msg)
await asyncio.sleep(60) await asyncio.sleep(self.keepalive_delay)
async def send_message(self, message): async def send_message(self, message):
_logger.info("SEND %r", message) _logger.info("SEND %r", message)
@@ -93,11 +181,19 @@ class KardexClientProtocol(asyncio.Protocol):
_logger.info("ping ok") _logger.info("ping ok")
else: else:
_logger.info("notify odoo: %s", msg) _logger.info("notify odoo: %s", msg)
self.loop.create_task(self.notify_odoo(msg)) self._loop.create_task(self.notify_odoo(msg))
def connection_lost(self, exc): def connection_lost(self, exc):
_logger.error("Connection lost: %s", exc) _logger.error("Kardex client: connection lost: %s", exc)
self.loop.stop() super().connection_lost(exc)
def connection_failed(self, exc):
_logger.error("Kardex client: failed to open connection: %s", exc)
super().connection_failed(exc)
def stop_trying(self):
super().stop_trying()
self._loop.stop()
async def notify_odoo(self, msg): async def notify_odoo(self, msg):
url = self.args.odoo_url + "/vertical-lift" url = self.args.odoo_url + "/vertical-lift"
@@ -105,18 +201,26 @@ class KardexClientProtocol(asyncio.Protocol):
params = {"answer": msg, "secret": self.args.secret} params = {"answer": msg, "secret": self.args.secret}
async with session.post(url, data=params) as resp: async with session.post(url, data=params) as resp:
resp_text = await resp.text() resp_text = await resp.text()
_logger.info("Response from Odoo: %s %s", resp.status, resp_text) _logger.info("Reponse from Odoo: %s %s", resp.status, resp_text)
def main(args, ssl_context=None): def main(args, ssl_context=None):
logging.basicConfig( logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
) )
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if args.debug:
loop.set_debug(True)
queue = asyncio.Queue(loop=loop) queue = asyncio.Queue(loop=loop)
# create the main server # create the main server
coro = loop.create_server( coro = loop.create_server(
lambda: KardexProxyProtocol(loop, queue, args), host=args.host, port=args.port, lambda: KardexProxyProtocol(queue, loop, args),
host=args.host,
port=args.port,
) )
loop.run_until_complete(coro) loop.run_until_complete(coro)
@@ -126,21 +230,18 @@ def main(args, ssl_context=None):
ssl_context = ssl.create_default_context() ssl_context = ssl.create_default_context()
else: else:
ssl_context = None ssl_context = None
coro = loop.create_connection( client = KardexClientProtocol(
lambda: KardexClientProtocol(loop, queue, args), queue,
args,
loop,
host=args.kardex_host, host=args.kardex_host,
port=args.kardex_port, port=args.kardex_port,
ssl=ssl_context, ssl=ssl_context,
) )
transport, client = loop.run_until_complete(coro) client.connect()
loop.create_task(client.keepalive()) loop.create_task(client.keepalive())
loop.create_task(client.process_queue()) loop.create_task(client.process_queue())
try: loop.run_forever()
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
return 0
def make_parser(): def make_parser():
@@ -153,9 +254,12 @@ def make_parser():
kardex_port = int(os.environ.get("KARDEX_PORT", "9600")) kardex_port = int(os.environ.get("KARDEX_PORT", "9600"))
kardex_use_tls = ( kardex_use_tls = (
False False
if os.environ.get("KARDEX_TLS", "") in ("", "0", "False", "FALSE") if os.environ.get("KARDEX_TLS", "") in ("", "0", "false", "False", "FALSE")
else True else True
) )
debug = (
True if os.environ.get("DEBUG", "") in ("1", "true", "True", "TRUE") else False
)
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
arguments = [ arguments = [
("--host", listen_address, str), ("--host", listen_address, str),
@@ -166,6 +270,7 @@ def make_parser():
("--kardex-host", kardex_host, str), ("--kardex-host", kardex_host, str),
("--kardex-port", kardex_port, str), ("--kardex-port", kardex_port, str),
("--kardex-use-tls", kardex_use_tls, bool), ("--kardex-use-tls", kardex_use_tls, bool),
("--debug", debug, bool),
] ]
for name, default, type_ in arguments: for name, default, type_ in arguments:
parser.add_argument(name, default=default, action="store", type=type_) parser.add_argument(name, default=default, action="store", type=type_)