EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit 3ef51ec5 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Fixing interfaces for register/deregister to also pass sources config

parent 6c3c3d3d
No related branches found
No related tags found
1 merge request!15Registration routes
Pipeline #25972 failed
...@@ -25,8 +25,8 @@ def run_daemon(config: RegistrarConfig): ...@@ -25,8 +25,8 @@ def run_daemon(config: RegistrarConfig):
config (RegistarConfig): the root configuration. config (RegistarConfig): the root configuration.
""" """
client = redis.Redis( client = redis.Redis(
host=config.redis_host, host=config.redis_host or "redis",
port=config.redis_port, port=config.redis_port or 6379,
charset="utf-8", charset="utf-8",
decode_responses=True decode_responses=True
) )
...@@ -50,10 +50,14 @@ def run_daemon(config: RegistrarConfig): ...@@ -50,10 +50,14 @@ def run_daemon(config: RegistrarConfig):
try: try:
route_cfg = queue_to_route[queue] route_cfg = queue_to_route[queue]
if route_cfg.default_mode == RouteMode.REGISTER: if route_cfg.default_mode == RouteMode.REGISTER:
register(route_cfg, value) register(route_cfg, config.sources, value)
elif route_cfg.default_mode == RouteMode.DEREGISTER: elif route_cfg.default_mode == RouteMode.DEREGISTER:
deregister(route_cfg, value) deregister(route_cfg, config.sources, value)
if route_cfg.default_mode == RouteMode.DEREGISTER_IDENTIFIER: elif route_cfg.default_mode == RouteMode.DEREGISTER_IDENTIFIER:
deregister(route_cfg, value, use_id=True) deregister(route_cfg, config.sources, value, use_id=True)
if route_cfg.output_queue is not None:
client.lpush(route_cfg.output_queue, value)
except Exception as exc: except Exception as exc:
logger.exception(exc) logger.exception(exc)
...@@ -6,7 +6,7 @@ import structlog ...@@ -6,7 +6,7 @@ import structlog
import structlog.contextvars import structlog.contextvars
from .abc import Route from .abc import Route
from .config import HandlerConfig, HandlersConfig, RouteConfig from .config import HandlerConfig, HandlersConfig, RouteConfig, SourceConfig
from .backend import get_backends from .backend import get_backends
from .utils import import_by_path from .utils import import_by_path
from .exceptions import RegistrationError from .exceptions import RegistrationError
...@@ -14,20 +14,23 @@ from .exceptions import RegistrationError ...@@ -14,20 +14,23 @@ from .exceptions import RegistrationError
logger = structlog.getLogger(__name__) logger = structlog.getLogger(__name__)
def register(route_cfg: RouteConfig, value: str): def register(
route_cfg: RouteConfig, source_cfgs: List[SourceConfig], value: str
):
"""Handles the registration of a specific item """Handles the registration of a specific item
Arguments: Arguments:
route_cfg (RouteConfig): the used route configuration route_cfg (RouteConfig): the used route configuration
source_cfgs (List[SourceConfig]): the source configs
value (str): the raw value to be parsed value (str): the raw value to be parsed
""" """
route_cls = import_by_path(route_cfg.path) route_cls = import_by_path(route_cfg.path)
route: Route = route_cls(*route_cfg.args, **route_cfg.kwargs) route: Route = route_cls(route_cfg, *route_cfg.args, **route_cfg.kwargs)
# parse the actual item to be processed # parse the actual item to be processed
item = route.parse(value) item = route.parse(value)
# determine the source to be used with that item # determine the source to be used with that item
source = route.get_source(item) source = route.get_source(source_cfgs, item)
replaced = False replaced = False
with structlog.contextvars.bound_contextvars(item=item): with structlog.contextvars.bound_contextvars(item=item):
...@@ -61,11 +64,17 @@ def register(route_cfg: RouteConfig, value: str): ...@@ -61,11 +64,17 @@ def register(route_cfg: RouteConfig, value: str):
) )
def deregister(route_cfg: RouteConfig, value: str, use_id: bool = False): def deregister(
route_cfg: RouteConfig,
source_cfgs: List[SourceConfig],
value: str,
use_id: bool = False,
):
"""Handles the deregistration of a specific item. """Handles the deregistration of a specific item.
Arguments: Arguments:
route_cfg (RouteConfig): the used route configuration route_cfg (RouteConfig): the used route configuration
source_cfgs (List[SourceConfig]): the source configs
value (str): the raw value to be parsed or the identifier if ``use_id`` value (str): the raw value to be parsed or the identifier if ``use_id``
is used. is used.
use_id (bool): to deregister using the identifier, or a parsed item. use_id (bool): to deregister using the identifier, or a parsed item.
...@@ -82,7 +91,7 @@ def deregister(route_cfg: RouteConfig, value: str, use_id: bool = False): ...@@ -82,7 +91,7 @@ def deregister(route_cfg: RouteConfig, value: str, use_id: bool = False):
logger.info(f"Handling deregistration for '{item!r}'.") logger.info(f"Handling deregistration for '{item!r}'.")
# determine the source to be used with that item # determine the source to be used with that item
source = route.determine_source(item) source = route.get_source(source_cfgs, item)
with structlog.contextvars.bound_contextvars(item=item): with structlog.contextvars.bound_contextvars(item=item):
for pre_handler in get_pre_handlers(route_cfg.handlers): for pre_handler in get_pre_handlers(route_cfg.handlers):
...@@ -106,8 +115,7 @@ def deregister(route_cfg: RouteConfig, value: str, use_id: bool = False): ...@@ -106,8 +115,7 @@ def deregister(route_cfg: RouteConfig, value: str, use_id: bool = False):
def _instantiate_handlers(handler_configs: List[HandlerConfig]): def _instantiate_handlers(handler_configs: List[HandlerConfig]):
"""Helper to get an arbitrary handler """Helper to get an arbitrary handler"""
"""
return [ return [
import_by_path(handler_config.path)( import_by_path(handler_config.path)(
*handler_config.args, *handler_config.args,
...@@ -118,18 +126,15 @@ def _instantiate_handlers(handler_configs: List[HandlerConfig]): ...@@ -118,18 +126,15 @@ def _instantiate_handlers(handler_configs: List[HandlerConfig]):
def get_pre_handlers(config: HandlersConfig) -> List[Callable]: def get_pre_handlers(config: HandlersConfig) -> List[Callable]:
"""Instantiates pre error handlers. """Instantiates pre error handlers."""
"""
return _instantiate_handlers(config.pre) return _instantiate_handlers(config.pre)
def get_post_handlers(config: HandlersConfig) -> List[Callable]: def get_post_handlers(config: HandlersConfig) -> List[Callable]:
"""Instantiates post error handlers. """Instantiates post error handlers."""
"""
return _instantiate_handlers(config.post) return _instantiate_handlers(config.post)
def get_error_handlers(config: HandlersConfig) -> List[Callable]: def get_error_handlers(config: HandlersConfig) -> List[Callable]:
"""Instantiates error error handlers. """Instantiates error error handlers."""
"""
return _instantiate_handlers(config.error) return _instantiate_handlers(config.error)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment