Merge PR #1316 into 14.0

Signed-off-by sebalix
This commit is contained in:
OCA-git-bot
2021-11-30 12:53:10 +00:00
2 changed files with 138 additions and 19 deletions

View File

@@ -0,0 +1,7 @@
Kardex device proxy.
This is meant to run as a proxy between odoo and the JMIF server
Using a proxy is required to make the synchronous JMIF protocol asynchronous for Odoo, by using ping backs.
The proxy requires python >= 3.7.

View File

@@ -3,17 +3,19 @@ import argparse
import asyncio
import logging
import os
import random
import ssl
import sys
import time
import aiohttp
import aiohttp # pylint: disable=missing-manifest-dependency
_logger = logging.getLogger(__name__)
class KardexProxyProtocol(asyncio.Protocol):
def __init__(self, loop, queue, args):
_logger.info("Proxy created")
def __init__(self, queue, loop, args):
_logger.info("Proxy: created")
self.transport = None
self.buffer = b""
self.queue = queue
@@ -21,7 +23,7 @@ class KardexProxyProtocol(asyncio.Protocol):
self.args = args
def connection_made(self, transport):
_logger.info("Proxy incoming cnx")
_logger.info("Proxy: incoming cnx made")
self.transport = transport
self.buffer = b""
@@ -46,29 +48,116 @@ class KardexProxyProtocol(asyncio.Protocol):
self.buffer = b""
def connection_lost(self, exc):
if exc:
_logger.error("Proxy: incoming cnx lost: %s", exc)
else:
_logger.info("Proxy: incoming cnx closed")
self.transport = None
self.buffer = b""
class KardexClientProtocol(asyncio.Protocol):
def __init__(self, loop, queue, args):
class ReconnectingTCPClientProtocol(asyncio.Protocol):
# source: https://stackoverflow.com/a/49452683/1504003
max_delay = 3600
initial_delay = 1.0
factor = 2.7182818284590451
jitter = 0.119626565582
max_retries = None
def __init__(self, *args, loop=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._args = args
self._kwargs = kwargs
self._retries = 0
self._delay = self.initial_delay
self._continue_trying = True
self._call_handle = None
self._connector = None
def connection_lost(self, exc):
if self._continue_trying:
self.retry()
def connection_failed(self, exc):
if self._continue_trying:
self.retry()
def retry(self):
if not self._continue_trying:
return
self._retries += 1
if self.max_retries is not None and (self._retries > self.max_retries):
self.stop_trying()
return
self._delay = min(self._delay * self.factor, self.max_delay)
if self.jitter:
self._delay = random.normalvariate(self._delay, self._delay * self.jitter)
_logger.info("%s: will retry connection after %ss", self, self._delay)
self._call_handle = self._loop.call_later(self._delay, self.connect)
def connect(self):
if self._connector is None:
self._connector = self._loop.create_task(self._connect())
async def _connect(self):
try:
await self._loop.create_connection(
lambda: self, *self._args, **self._kwargs
)
except Exception as exc:
self._loop.call_soon(self.connection_failed, exc)
else:
self._delay = self.initial_delay
self._retries = 0
finally:
self._connector = None
def stop_trying(self):
if self._call_handle:
self._call_handle.cancel()
self._call_handle = None
self._continue_trying = False
if self._connector is not None:
self._connector.cancel()
self._connector = None
class KardexClientProtocol(ReconnectingTCPClientProtocol):
max_delay = 15
initial_delay = 0.5
factor = 1.7182818284590451
jitter = 0.119626565582
# if we set a number of retries, after N failed
# retries, it will stop the event loop and exit
max_retries = None
initial_keepalive_delay = 20
keepalive_delay = 50
def __init__(self, queue, args, loop, **kwargs):
super().__init__(loop=loop, **kwargs)
_logger.info("started kardex client")
self.loop = loop
self.queue = queue
self.args = args
self.transport = None
self.buffer = b""
self.args = args
def connection_made(self, transport):
self.transport = transport
_logger.info("connected to kardex server %r", transport)
async def keepalive(self):
await asyncio.sleep(self.initial_keepalive_delay)
while True:
t = int(time.time())
msg = "61|ping%d|SH1-1|0|0||||||||\r\n" % t
await self.send_message(msg)
await asyncio.sleep(20)
await asyncio.sleep(self.keepalive_delay)
async def send_message(self, message):
_logger.info("SEND %r", message)
@@ -92,10 +181,19 @@ class KardexClientProtocol(asyncio.Protocol):
_logger.info("ping ok")
else:
_logger.info("notify odoo: %s", msg)
self.loop.create_task(self.notify_odoo(msg))
self._loop.create_task(self.notify_odoo(msg))
def connection_lost(self, exc):
self.loop.stop()
_logger.error("Kardex client: connection lost: %s", exc)
super().connection_lost(exc)
def connection_failed(self, exc):
_logger.error("Kardex client: failed to open connection: %s", exc)
super().connection_failed(exc)
def stop_trying(self):
super().stop_trying()
self._loop.stop()
async def notify_odoo(self, msg):
url = self.args.odoo_url + "/vertical-lift"
@@ -108,13 +206,21 @@ class KardexClientProtocol(asyncio.Protocol):
def main(args, ssl_context=None):
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
loop = asyncio.get_event_loop()
if args.debug:
loop.set_debug(True)
queue = asyncio.Queue(loop=loop)
# create the main server
coro = loop.create_server(
lambda: KardexProxyProtocol(loop, queue, args), host=args.host, port=args.port
lambda: KardexProxyProtocol(queue, loop, args),
host=args.host,
port=args.port,
)
loop.run_until_complete(coro)
@@ -124,17 +230,18 @@ def main(args, ssl_context=None):
ssl_context = ssl.create_default_context()
else:
ssl_context = None
coro = loop.create_connection(
lambda: KardexClientProtocol(loop, queue, args),
client = KardexClientProtocol(
queue,
args,
loop,
host=args.kardex_host,
port=args.kardex_port,
ssl=ssl_context,
)
transport, client = loop.run_until_complete(coro)
client.connect()
loop.create_task(client.keepalive())
loop.create_task(client.process_queue())
loop.run_forever()
loop.close()
def make_parser():
@@ -147,9 +254,12 @@ def make_parser():
kardex_port = int(os.environ.get("KARDEX_PORT", "9600"))
kardex_use_tls = (
False
if os.environ.get("KARDEX_TLS", "") in ("", "0", "False", "FALSE")
if os.environ.get("KARDEX_TLS", "") in ("", "0", "false", "False", "FALSE")
else True
)
debug = (
True if os.environ.get("DEBUG", "") in ("1", "true", "True", "TRUE") else False
)
parser = argparse.ArgumentParser()
arguments = [
("--host", listen_address, str),
@@ -160,6 +270,7 @@ def make_parser():
("--kardex-host", kardex_host, str),
("--kardex-port", kardex_port, str),
("--kardex-use-tls", kardex_use_tls, bool),
("--debug", debug, bool),
]
for name, default, type_ in arguments:
parser.add_argument(name, default=default, action="store", type=type_)
@@ -169,4 +280,5 @@ def make_parser():
if __name__ == "__main__":
parser = make_parser()
args = parser.parse_args()
main(args)
res = main(args)
sys.exit(res)