Source code for tecoroute._cli

from argparse import SUPPRESS, ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace, FileType
from asyncio import CancelledError, get_event_loop, run, set_event_loop_policy
from logging import DEBUG, INFO, basicConfig
from signal import SIGINT, SIGTERM
from sys import exit

from ._misc import (
    APPLICATION,
    HOST,
    LATENCY_COUNT,
    LATENCY_THRESHOLD,
    PORT_CONNECTOR,
    TECOROUTE_HOST,
    TECOROUTE_PORT,
    dist,
)
from .connector import UdpConnector

try:
    from uvloop import EventLoopPolicy
except ImportError:
    pass
else:
    set_event_loop_policy(EventLoopPolicy())


async def _main(args: Namespace) -> int:
    try:
        password = args.password
    except AttributeError:
        password = args.password_file.read()
    basicConfig(level=DEBUG if args.verbose else INFO)
    connector = UdpConnector(
        host=args.host,
        port=args.port,
        username=args.username,
        password=password,
        plc=args.plc,
        application=args.application,
        tecoroute_host=args.tecoroute_host,
        tecoroute_port=args.tecoroute_port,
        latency_threshold=args.latency_threshold,
        latency_count=args.latency_count,
    )
    runner = connector.run()
    closer = connector.close
    for signum in (SIGINT, SIGTERM):
        get_event_loop().add_signal_handler(signum, closer)
    try:
        await runner
    except Exception:
        return 1
    except CancelledError:
        pass
    return 0


[docs]def cli() -> None: """Run the command-line interface.""" parser = ArgumentParser( prog=dist.entry_points[0].name, description=dist.metadata["Summary"] ) parser.add_argument( "-v", "--verbose", action="store_true", help="verbose mode", ) parser.add_argument( "--version", action="version", version=f"TecoRoute {dist.version}", ) subparsers = parser.add_subparsers(dest="command", required=True, metavar="COMMAND") parser_connector = subparsers.add_parser( "connector", formatter_class=ArgumentDefaultsHelpFormatter, argument_default=SUPPRESS, ) parser_connector.add_argument( "-m", "--mode", choices=("udp",), required=True, help="mode of operation", ) parser_connector.add_argument( "-H", "--host", default=HOST, help="host to listen on", ) parser_connector.add_argument( "-p", "--port", default=PORT_CONNECTOR, type=int, help="port to listen on", ) parser_connector.add_argument( "-u", "--username", required=True, help="TecoRoute username", ) parser_password = parser_connector.add_mutually_exclusive_group(required=True) parser_password.add_argument( "-s", "--password", help="TecoRoute password", ) parser_password.add_argument( "-S", "--password-file", type=FileType(), help="TecoRoute password from file", ) parser_connector.add_argument( "-l", "--plc", required=True, help="PLC to connect", ) parser_connector.add_argument( "-a", "--application", default=APPLICATION, help=( "TecoRoute application name; only if you have assigned your own " "application name from Teco a.s." ), ) parser_connector.add_argument( "--tecoroute-host", default=TECOROUTE_HOST, help="TecoRoute service host", ) parser_connector.add_argument( "--tecoroute-port", default=TECOROUTE_PORT, type=int, help="TecoRoute service port", ) parser_connector.add_argument( "--latency-threshold", default=LATENCY_THRESHOLD, type=float, help="maximum latency in seconds", ) parser_connector.add_argument( "--latency-count", default=LATENCY_COUNT, type=int, help="maximum number of latency errors", ) exit(run(_main(parser.parse_args())))