From 35b71715bacd88d1c2e9aee8c017cf0ee3537352 Mon Sep 17 00:00:00 2001 From: Alexandre Fayolle Date: Mon, 16 Nov 2020 09:59:25 +0100 Subject: [PATCH 1/2] [FIX] stock_vertical_lift_kardex proxy The proxy would not exit properly in case of a lost connection with the JMIF server. The change allows the proxy to exit, giving a chance to an external monitoring system to restart the service. --- stock_vertical_lift_kardex/proxy/README.rst | 6 ++++++ .../proxy/kardex-proxy.py | 19 +++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/stock_vertical_lift_kardex/proxy/README.rst b/stock_vertical_lift_kardex/proxy/README.rst index 00dcb4d59..a73d4a981 100644 --- a/stock_vertical_lift_kardex/proxy/README.rst +++ b/stock_vertical_lift_kardex/proxy/README.rst @@ -1 +1,7 @@ Kardex device proxy. + +This is meant to run as a proxy between odoo and the JMIF server + +Using a proxy is required to make the synchronous JMIF protocol asynchronous for Odoo, by using ping backs. + +The proxy requires python >= 3.7. diff --git a/stock_vertical_lift_kardex/proxy/kardex-proxy.py b/stock_vertical_lift_kardex/proxy/kardex-proxy.py index 75609cbc1..321819892 100755 --- a/stock_vertical_lift_kardex/proxy/kardex-proxy.py +++ b/stock_vertical_lift_kardex/proxy/kardex-proxy.py @@ -4,6 +4,7 @@ import asyncio import logging import os import ssl +import sys import time import aiohttp # pylint: disable=missing-manifest-dependency @@ -68,7 +69,7 @@ class KardexClientProtocol(asyncio.Protocol): t = int(time.time()) msg = "61|ping%d|SH1-1|0|0||||||||\r\n" % t await self.send_message(msg) - await asyncio.sleep(20) + await asyncio.sleep(60) async def send_message(self, message): _logger.info("SEND %r", message) @@ -95,6 +96,7 @@ class KardexClientProtocol(asyncio.Protocol): self.loop.create_task(self.notify_odoo(msg)) def connection_lost(self, exc): + _logger.error("Connection lost: %s", exc) self.loop.stop() async def notify_odoo(self, msg): @@ -103,7 +105,7 @@ class KardexClientProtocol(asyncio.Protocol): params = {"answer": msg, "secret": self.args.secret} async with session.post(url, data=params) as resp: resp_text = await resp.text() - _logger.info("Reponse from Odoo: %s %s", resp.status, resp_text) + _logger.info("Response from Odoo: %s %s", resp.status, resp_text) def main(args, ssl_context=None): @@ -114,7 +116,7 @@ def main(args, ssl_context=None): queue = asyncio.Queue(loop=loop) # create the main server coro = loop.create_server( - lambda: KardexProxyProtocol(loop, queue, args), host=args.host, port=args.port + lambda: KardexProxyProtocol(loop, queue, args), host=args.host, port=args.port, ) loop.run_until_complete(coro) @@ -133,8 +135,12 @@ def main(args, ssl_context=None): transport, client = loop.run_until_complete(coro) loop.create_task(client.keepalive()) loop.create_task(client.process_queue()) - loop.run_forever() - loop.close() + try: + loop.run_forever() + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + return 0 def make_parser(): @@ -169,4 +175,5 @@ def make_parser(): if __name__ == "__main__": parser = make_parser() args = parser.parse_args() - main(args) + res = main(args) + sys.exit(res) From 34eb5897520e4254986c2d580b1d1c363916fd68 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 16 Dec 2020 16:08:52 +0100 Subject: [PATCH 2/2] Use a Reconnecting TCP Client Protocol Source is https://stackoverflow.com/a/49452683/1504003 with small adaptations (reset delay and retries when we get back a connection) Instead of creating the connection in the "main" method, loop and reopen a new event loop on failures, let the Protocol class handle connections and retries on failures. Reduce keepalive interval and store values in attributes. As many timeouts in systems are 60 seconds, a keepalive of 50 seconds is less likely to be stopped. Improve logs: the docstring of connection_lost is: The argument is either an exception object or None. The latter means a regular EOF is received, or the connection was aborted or closed by this side of the connection. When it receives no exception, it should not raise an error log. --- .../proxy/kardex-proxy.py | 151 +++++++++++++++--- 1 file changed, 127 insertions(+), 24 deletions(-) diff --git a/stock_vertical_lift_kardex/proxy/kardex-proxy.py b/stock_vertical_lift_kardex/proxy/kardex-proxy.py index 321819892..0d71655ab 100755 --- a/stock_vertical_lift_kardex/proxy/kardex-proxy.py +++ b/stock_vertical_lift_kardex/proxy/kardex-proxy.py @@ -3,6 +3,7 @@ import argparse import asyncio import logging import os +import random import ssl import sys import time @@ -13,8 +14,8 @@ _logger = logging.getLogger(__name__) class KardexProxyProtocol(asyncio.Protocol): - def __init__(self, loop, queue, args): - _logger.info("Proxy created") + def __init__(self, queue, loop, args): + _logger.info("Proxy: created") self.transport = None self.buffer = b"" self.queue = queue @@ -22,7 +23,7 @@ class KardexProxyProtocol(asyncio.Protocol): self.args = args def connection_made(self, transport): - _logger.info("Proxy incoming cnx") + _logger.info("Proxy: incoming cnx made") self.transport = transport self.buffer = b"" @@ -47,29 +48,116 @@ class KardexProxyProtocol(asyncio.Protocol): self.buffer = b"" def connection_lost(self, exc): + if exc: + _logger.error("Proxy: incoming cnx lost: %s", exc) + else: + _logger.info("Proxy: incoming cnx closed") self.transport = None self.buffer = b"" -class KardexClientProtocol(asyncio.Protocol): - def __init__(self, loop, queue, args): +class ReconnectingTCPClientProtocol(asyncio.Protocol): + # source: https://stackoverflow.com/a/49452683/1504003 + max_delay = 3600 + initial_delay = 1.0 + factor = 2.7182818284590451 + jitter = 0.119626565582 + max_retries = None + + def __init__(self, *args, loop=None, **kwargs): + if loop is None: + loop = asyncio.get_event_loop() + self._loop = loop + self._args = args + self._kwargs = kwargs + self._retries = 0 + self._delay = self.initial_delay + self._continue_trying = True + self._call_handle = None + self._connector = None + + def connection_lost(self, exc): + if self._continue_trying: + self.retry() + + def connection_failed(self, exc): + if self._continue_trying: + self.retry() + + def retry(self): + if not self._continue_trying: + return + + self._retries += 1 + if self.max_retries is not None and (self._retries > self.max_retries): + self.stop_trying() + return + + self._delay = min(self._delay * self.factor, self.max_delay) + if self.jitter: + self._delay = random.normalvariate(self._delay, self._delay * self.jitter) + _logger.info("%s: will retry connection after %ss", self, self._delay) + self._call_handle = self._loop.call_later(self._delay, self.connect) + + def connect(self): + if self._connector is None: + self._connector = self._loop.create_task(self._connect()) + + async def _connect(self): + try: + await self._loop.create_connection( + lambda: self, *self._args, **self._kwargs + ) + except Exception as exc: + self._loop.call_soon(self.connection_failed, exc) + else: + self._delay = self.initial_delay + self._retries = 0 + finally: + self._connector = None + + def stop_trying(self): + if self._call_handle: + self._call_handle.cancel() + self._call_handle = None + self._continue_trying = False + if self._connector is not None: + self._connector.cancel() + self._connector = None + + +class KardexClientProtocol(ReconnectingTCPClientProtocol): + + max_delay = 15 + initial_delay = 0.5 + factor = 1.7182818284590451 + jitter = 0.119626565582 + # if we set a number of retries, after N failed + # retries, it will stop the event loop and exit + max_retries = None + + initial_keepalive_delay = 20 + keepalive_delay = 50 + + def __init__(self, queue, args, loop, **kwargs): + super().__init__(loop=loop, **kwargs) _logger.info("started kardex client") - self.loop = loop self.queue = queue - self.args = args self.transport = None self.buffer = b"" + self.args = args def connection_made(self, transport): self.transport = transport _logger.info("connected to kardex server %r", transport) async def keepalive(self): + await asyncio.sleep(self.initial_keepalive_delay) while True: t = int(time.time()) msg = "61|ping%d|SH1-1|0|0||||||||\r\n" % t await self.send_message(msg) - await asyncio.sleep(60) + await asyncio.sleep(self.keepalive_delay) async def send_message(self, message): _logger.info("SEND %r", message) @@ -93,11 +181,19 @@ class KardexClientProtocol(asyncio.Protocol): _logger.info("ping ok") else: _logger.info("notify odoo: %s", msg) - self.loop.create_task(self.notify_odoo(msg)) + self._loop.create_task(self.notify_odoo(msg)) def connection_lost(self, exc): - _logger.error("Connection lost: %s", exc) - self.loop.stop() + _logger.error("Kardex client: connection lost: %s", exc) + super().connection_lost(exc) + + def connection_failed(self, exc): + _logger.error("Kardex client: failed to open connection: %s", exc) + super().connection_failed(exc) + + def stop_trying(self): + super().stop_trying() + self._loop.stop() async def notify_odoo(self, msg): url = self.args.odoo_url + "/vertical-lift" @@ -105,18 +201,24 @@ class KardexClientProtocol(asyncio.Protocol): params = {"answer": msg, "secret": self.args.secret} async with session.post(url, data=params) as resp: resp_text = await resp.text() - _logger.info("Response from Odoo: %s %s", resp.status, resp_text) + _logger.info("Reponse from Odoo: %s %s", resp.status, resp_text) def main(args, ssl_context=None): logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", ) + loop = asyncio.get_event_loop() + + if args.debug: + loop.set_debug(True) + queue = asyncio.Queue(loop=loop) # create the main server coro = loop.create_server( - lambda: KardexProxyProtocol(loop, queue, args), host=args.host, port=args.port, + lambda: KardexProxyProtocol(queue, loop, args), host=args.host, port=args.port, ) loop.run_until_complete(coro) @@ -126,21 +228,18 @@ def main(args, ssl_context=None): ssl_context = ssl.create_default_context() else: ssl_context = None - coro = loop.create_connection( - lambda: KardexClientProtocol(loop, queue, args), + client = KardexClientProtocol( + queue, + args, + loop, host=args.kardex_host, port=args.kardex_port, ssl=ssl_context, ) - transport, client = loop.run_until_complete(coro) + client.connect() loop.create_task(client.keepalive()) loop.create_task(client.process_queue()) - try: - loop.run_forever() - finally: - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() - return 0 + loop.run_forever() def make_parser(): @@ -153,9 +252,12 @@ def make_parser(): kardex_port = int(os.environ.get("KARDEX_PORT", "9600")) kardex_use_tls = ( False - if os.environ.get("KARDEX_TLS", "") in ("", "0", "False", "FALSE") + if os.environ.get("KARDEX_TLS", "") in ("", "0", "false", "False", "FALSE") else True ) + debug = ( + True if os.environ.get("DEBUG", "") in ("1", "true", "True", "TRUE") else False + ) parser = argparse.ArgumentParser() arguments = [ ("--host", listen_address, str), @@ -166,6 +268,7 @@ def make_parser(): ("--kardex-host", kardex_host, str), ("--kardex-port", kardex_port, str), ("--kardex-use-tls", kardex_use_tls, bool), + ("--debug", debug, bool), ] for name, default, type_ in arguments: parser.add_argument(name, default=default, action="store", type=type_)