from os.path import join, dirname import logging.config import click import structlog import structlog.contextvars import yaml import jsonschema from .registrar import ( register_item, register_path, deregister_item, deregister_identifier, deregister_path, ) from .daemon import run_daemon from .config import load_config from .utils import import_by_path def setup_logging(debug=False): logging.config.dictConfig( { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": structlog.stdlib.ProcessorFormatter, "processor": structlog.dev.ConsoleRenderer() if debug else structlog.processors.JSONRenderer(), }, }, "handlers": { "console": { "class": "logging.StreamHandler", "level": "DEBUG" if debug else "INFO", "formatter": "json", }, }, "root": { "handlers": ["console"], "level": "DEBUG" if debug else "INFO", }, } ) structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.stdlib.filter_by_level, structlog.processors.TimeStamper(fmt="iso"), structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.stdlib.ProcessorFormatter.wrap_for_formatter, ], context_class=structlog.threadlocal.wrap_dict(dict), logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) def validate_config(config): with open(join(dirname(__file__), "config-schema.yaml")) as f: schema = yaml.load(f) jsonschema.validate(config, schema) @click.group() def cli(): pass @cli.command(help="Run the registrar daemon, attaching to a Redis queue") @click.option("--config-file", type=click.File("r")) @click.option("--validate", is_flag=True) @click.option("--replace", is_flag=True) @click.option("--host", type=str) @click.option("--port", type=int) @click.option("--register-queue", "--listen-queue", type=str) @click.option("--register-path-queue", "--listen-path-queue", type=str) @click.option("--deregister-queue", type=str) @click.option("--deregister-path-queue", type=str) @click.option("--progress-set", type=str) @click.option("--failure-set", type=str) @click.option("--success-set", type=str) @click.option("-e", "--extra", type=str, multiple=True, default=[]) @click.option("--debug", is_flag=True) def daemon( config_file=None, validate=False, replace=False, host=None, port=None, register_queue=None, register_path_queue=None, progress_set=None, failure_set=None, success_set=None, deregister_queue=None, deregister_path_queue=None, extra=None, debug=False, ): """ Run the registrar daemon to listen on the given queues and execute the (de-)registrations commands. Examples: \b registrar daemon --config-file config.yaml \ --validate --replace \ --host redis --port 6379 \ --listen-queue register \ --deregister-queue deregister \ --progress-set register_progress \ --failure-set register_failure \ --success-set register_success \ --extra some_queue=path.to.handler \ --extra some_other_queue=path.to.another.handler \ --debug """ setup_logging(debug) config = load_config(config_file) if validate: validate_config(config) handlers = {} if register_queue: handlers[register_queue] = register_item if register_path_queue: handlers[register_path_queue] = register_path if deregister_queue: handlers[deregister_queue] = deregister_item if deregister_path_queue: handlers[deregister_path_queue] = deregister_path for extra_handler in extra: queue, _, path = extra_handler.partition("=") handler = import_by_path(path.strip()) handlers[queue.strip()] = handler run_daemon( config, replace, host, port, handlers, progress_set, failure_set, success_set, ) @cli.command(help="Run a single, one-off registration") @click.argument("item", type=str) @click.option("--config-file", type=click.File("r")) @click.option("--is-path", is_flag=True) @click.option("--validate", is_flag=True) @click.option("--replace", is_flag=True) @click.option("--debug", is_flag=True) def register( item, is_path=False, config_file=None, validate=False, replace=False, debug=False ): setup_logging(debug) config = load_config(config_file) if validate: validate_config(config) if is_path: register_path(config, item, replace) else: register_item(config, item, replace) @cli.command(help="Run a single, one-off de-registration") @click.argument("--path", type=str) @click.argument("--identifier", type=str) @click.option("--config-file", type=click.File("r")) @click.option("--validate", is_flag=True) @click.option("--debug", is_flag=True) def deregister( file_path=None, identifier=None, config_file=None, validate=False, debug=False ): setup_logging(debug) config = load_config(config_file) if validate: validate_config(config) if file_path: deregister_path(config, file_path) elif identifier: deregister_identifier(config, identifier) if __name__ == "__main__": cli()