-
Bernhard Mallinger authoredBernhard Mallinger authored
cli.py 5.93 KiB
from os.path import join, dirname
import logging.config
import click
import structlog
import structlog.contextvars
import yaml
import jsonschema
from .registrar import (
register_item,
register_path,
deregister_item,
deregister_identifier,
deregister_path,
)
from .daemon import run_daemon
from .config import load_config
from .utils import import_by_path
def setup_logging(debug=False):
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.dev.ConsoleRenderer()
if debug
else structlog.processors.JSONRenderer(),
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG" if debug else "INFO",
"formatter": "json",
},
},
"root": {
"handlers": ["console"],
"level": "DEBUG" if debug else "INFO",
},
}
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=structlog.threadlocal.wrap_dict(dict),
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
def validate_config(config):
with open(join(dirname(__file__), "config-schema.yaml")) as f:
schema = yaml.load(f)
jsonschema.validate(config, schema)
@click.group()
def cli():
pass
@cli.command(help="Run the registrar daemon, attaching to a Redis queue")
@click.option("--config-file", type=click.File("r"))
@click.option("--validate", is_flag=True)
@click.option("--replace", is_flag=True)
@click.option("--host", type=str)
@click.option("--port", type=int)
@click.option("--register-queue", "--listen-queue", type=str)
@click.option("--register-path-queue", "--listen-path-queue", type=str)
@click.option("--deregister-queue", type=str)
@click.option("--deregister-path-queue", type=str)
@click.option("--progress-set", type=str)
@click.option("--failure-set", type=str)
@click.option("--success-set", type=str)
@click.option("-e", "--extra", type=str, multiple=True, default=[])
@click.option("--debug", is_flag=True)
def daemon(
config_file=None,
validate=False,
replace=False,
host=None,
port=None,
register_queue=None,
register_path_queue=None,
progress_set=None,
failure_set=None,
success_set=None,
deregister_queue=None,
deregister_path_queue=None,
extra=None,
debug=False,
):
""" Run the registrar daemon to listen on the given queues
and execute the (de-)registrations commands.
Examples:
\b
registrar daemon --config-file config.yaml \
--validate --replace \
--host redis --port 6379 \
--listen-queue register \
--deregister-queue deregister \
--progress-set register_progress \
--failure-set register_failure \
--success-set register_success \
--extra some_queue=path.to.handler \
--extra some_other_queue=path.to.another.handler \
--debug
"""
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
handlers = {}
if register_queue:
handlers[register_queue] = register_item
if register_path_queue:
handlers[register_path_queue] = register_path
if deregister_queue:
handlers[deregister_queue] = deregister_item
if deregister_path_queue:
handlers[deregister_path_queue] = deregister_path
for extra_handler in extra:
queue, _, path = extra_handler.partition("=")
handler = import_by_path(path.strip())
handlers[queue.strip()] = handler
run_daemon(
config,
replace,
host,
port,
handlers,
progress_set,
failure_set,
success_set,
)
@cli.command(help="Run a single, one-off registration")
@click.argument("item", type=str)
@click.option("--config-file", type=click.File("r"))
@click.option("--is-path", is_flag=True)
@click.option("--validate", is_flag=True)
@click.option("--replace", is_flag=True)
@click.option("--debug", is_flag=True)
def register(
item, is_path=False, config_file=None, validate=False, replace=False, debug=False
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
if is_path:
register_path(config, item, replace)
else:
register_item(config, item, replace)
@cli.command(help="Run a single, one-off de-registration")
@click.argument("--path", type=str)
@click.argument("--identifier", type=str)
@click.option("--config-file", type=click.File("r"))
@click.option("--validate", is_flag=True)
@click.option("--debug", is_flag=True)
def deregister(
file_path=None, identifier=None, config_file=None, validate=False, debug=False
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
if file_path:
deregister_path(config, file_path)
elif identifier:
deregister_identifier(config, identifier)
if __name__ == "__main__":
cli()