diff --git a/stock_vertical_lift_kardex/proxy/kardex-proxy.py b/stock_vertical_lift_kardex/proxy/kardex-proxy.py index 321819892..86c24806f 100755 --- a/stock_vertical_lift_kardex/proxy/kardex-proxy.py +++ b/stock_vertical_lift_kardex/proxy/kardex-proxy.py @@ -3,6 +3,7 @@ import argparse import asyncio import logging import os +import random import ssl import sys import time @@ -13,8 +14,8 @@ _logger = logging.getLogger(__name__) class KardexProxyProtocol(asyncio.Protocol): - def __init__(self, loop, queue, args): - _logger.info("Proxy created") + def __init__(self, queue, loop, args): + _logger.info("Proxy: created") self.transport = None self.buffer = b"" self.queue = queue @@ -22,7 +23,7 @@ class KardexProxyProtocol(asyncio.Protocol): self.args = args def connection_made(self, transport): - _logger.info("Proxy incoming cnx") + _logger.info("Proxy: incoming cnx made") self.transport = transport self.buffer = b"" @@ -47,29 +48,116 @@ class KardexProxyProtocol(asyncio.Protocol): self.buffer = b"" def connection_lost(self, exc): + if exc: + _logger.error("Proxy: incoming cnx lost: %s", exc) + else: + _logger.info("Proxy: incoming cnx closed") self.transport = None self.buffer = b"" -class KardexClientProtocol(asyncio.Protocol): - def __init__(self, loop, queue, args): +class ReconnectingTCPClientProtocol(asyncio.Protocol): + # source: https://stackoverflow.com/a/49452683/1504003 + max_delay = 3600 + initial_delay = 1.0 + factor = 2.7182818284590451 + jitter = 0.119626565582 + max_retries = None + + def __init__(self, *args, loop=None, **kwargs): + if loop is None: + loop = asyncio.get_event_loop() + self._loop = loop + self._args = args + self._kwargs = kwargs + self._retries = 0 + self._delay = self.initial_delay + self._continue_trying = True + self._call_handle = None + self._connector = None + + def connection_lost(self, exc): + if self._continue_trying: + self.retry() + + def connection_failed(self, exc): + if self._continue_trying: + self.retry() + + def retry(self): + if not self._continue_trying: + return + + self._retries += 1 + if self.max_retries is not None and (self._retries > self.max_retries): + self.stop_trying() + return + + self._delay = min(self._delay * self.factor, self.max_delay) + if self.jitter: + self._delay = random.normalvariate(self._delay, self._delay * self.jitter) + _logger.info("%s: will retry connection after %ss", self, self._delay) + self._call_handle = self._loop.call_later(self._delay, self.connect) + + def connect(self): + if self._connector is None: + self._connector = self._loop.create_task(self._connect()) + + async def _connect(self): + try: + await self._loop.create_connection( + lambda: self, *self._args, **self._kwargs + ) + except Exception as exc: + self._loop.call_soon(self.connection_failed, exc) + else: + self._delay = self.initial_delay + self._retries = 0 + finally: + self._connector = None + + def stop_trying(self): + if self._call_handle: + self._call_handle.cancel() + self._call_handle = None + self._continue_trying = False + if self._connector is not None: + self._connector.cancel() + self._connector = None + + +class KardexClientProtocol(ReconnectingTCPClientProtocol): + + max_delay = 15 + initial_delay = 0.5 + factor = 1.7182818284590451 + jitter = 0.119626565582 + # if we set a number of retries, after N failed + # retries, it will stop the event loop and exit + max_retries = None + + initial_keepalive_delay = 20 + keepalive_delay = 50 + + def __init__(self, queue, args, loop, **kwargs): + super().__init__(loop=loop, **kwargs) _logger.info("started kardex client") - self.loop = loop self.queue = queue - self.args = args self.transport = None self.buffer = b"" + self.args = args def connection_made(self, transport): self.transport = transport _logger.info("connected to kardex server %r", transport) async def keepalive(self): + await asyncio.sleep(self.initial_keepalive_delay) while True: t = int(time.time()) msg = "61|ping%d|SH1-1|0|0||||||||\r\n" % t await self.send_message(msg) - await asyncio.sleep(60) + await asyncio.sleep(self.keepalive_delay) async def send_message(self, message): _logger.info("SEND %r", message) @@ -93,11 +181,19 @@ class KardexClientProtocol(asyncio.Protocol): _logger.info("ping ok") else: _logger.info("notify odoo: %s", msg) - self.loop.create_task(self.notify_odoo(msg)) + self._loop.create_task(self.notify_odoo(msg)) def connection_lost(self, exc): - _logger.error("Connection lost: %s", exc) - self.loop.stop() + _logger.error("Kardex client: connection lost: %s", exc) + super().connection_lost(exc) + + def connection_failed(self, exc): + _logger.error("Kardex client: failed to open connection: %s", exc) + super().connection_failed(exc) + + def stop_trying(self): + super().stop_trying() + self._loop.stop() async def notify_odoo(self, msg): url = self.args.odoo_url + "/vertical-lift" @@ -105,18 +201,26 @@ class KardexClientProtocol(asyncio.Protocol): params = {"answer": msg, "secret": self.args.secret} async with session.post(url, data=params) as resp: resp_text = await resp.text() - _logger.info("Response from Odoo: %s %s", resp.status, resp_text) + _logger.info("Reponse from Odoo: %s %s", resp.status, resp_text) def main(args, ssl_context=None): logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", ) + loop = asyncio.get_event_loop() + + if args.debug: + loop.set_debug(True) + queue = asyncio.Queue(loop=loop) # create the main server coro = loop.create_server( - lambda: KardexProxyProtocol(loop, queue, args), host=args.host, port=args.port, + lambda: KardexProxyProtocol(queue, loop, args), + host=args.host, + port=args.port, ) loop.run_until_complete(coro) @@ -126,21 +230,18 @@ def main(args, ssl_context=None): ssl_context = ssl.create_default_context() else: ssl_context = None - coro = loop.create_connection( - lambda: KardexClientProtocol(loop, queue, args), + client = KardexClientProtocol( + queue, + args, + loop, host=args.kardex_host, port=args.kardex_port, ssl=ssl_context, ) - transport, client = loop.run_until_complete(coro) + client.connect() loop.create_task(client.keepalive()) loop.create_task(client.process_queue()) - try: - loop.run_forever() - finally: - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() - return 0 + loop.run_forever() def make_parser(): @@ -153,9 +254,12 @@ def make_parser(): kardex_port = int(os.environ.get("KARDEX_PORT", "9600")) kardex_use_tls = ( False - if os.environ.get("KARDEX_TLS", "") in ("", "0", "False", "FALSE") + if os.environ.get("KARDEX_TLS", "") in ("", "0", "false", "False", "FALSE") else True ) + debug = ( + True if os.environ.get("DEBUG", "") in ("1", "true", "True", "TRUE") else False + ) parser = argparse.ArgumentParser() arguments = [ ("--host", listen_address, str), @@ -166,6 +270,7 @@ def make_parser(): ("--kardex-host", kardex_host, str), ("--kardex-port", kardex_port, str), ("--kardex-use-tls", kardex_use_tls, bool), + ("--debug", debug, bool), ] for name, default, type_ in arguments: parser.add_argument(name, default=default, action="store", type=type_)