From aae0f020133d9bc3bf7bee2afbdd326aba4d9f21 Mon Sep 17 00:00:00 2001 From: Fabian Schindler <fabian.schindler.strauss@gmail.com> Date: Tue, 26 Jul 2022 15:24:42 +0200 Subject: [PATCH] Implementing registration routes Reworking configuration and backends accordingly --- registrar/abc.py | 57 +++++++ registrar/backend/__init__.py | 54 +------ registrar/backend/abc.py | 41 ----- registrar/backend/eoxserver.py | 8 +- registrar/backend/utils.py | 33 ++++ registrar/cli.py | 196 ++++++++++-------------- registrar/config.py | 206 ++++++++++++++++++++++++- registrar/config.yaml | 40 +++++ registrar/daemon.py | 62 ++++---- registrar/exceptions.py | 3 +- registrar/json.py | 2 +- registrar/registrar.py | 270 +++++++++++---------------------- registrar/route/__init__.py | 0 registrar/route/json.py | 32 ++++ registrar/route/path.py | 24 +++ registrar/route/stac.py | 39 +++++ registrar/source.py | 147 ++++++++---------- registrar/xml.py | 2 +- 18 files changed, 701 insertions(+), 515 deletions(-) create mode 100644 registrar/abc.py delete mode 100644 registrar/backend/abc.py create mode 100644 registrar/backend/utils.py create mode 100644 registrar/config.yaml create mode 100644 registrar/route/__init__.py create mode 100644 registrar/route/json.py create mode 100644 registrar/route/path.py create mode 100644 registrar/route/stac.py diff --git a/registrar/abc.py b/registrar/abc.py new file mode 100644 index 0000000..fa0eb7e --- /dev/null +++ b/registrar/abc.py @@ -0,0 +1,57 @@ +"""Module for registrar routes +""" + +from abc import ABC, abstractmethod +from typing import Generic, List, Optional, TypeVar + +from .config import RouteConfig, SourceConfig +from .source import Source + +T = TypeVar("T") + + +class Backend(ABC, Generic[T]): + """A backend registers the given item""" + + @abstractmethod + def exists(self, source: Optional[Source], item: T) -> bool: + """Checks if an item exists in the backend.""" + + @abstractmethod + def register(self, source: Optional[Source], item: T, replace: bool): + """Registers an item on the backend.""" + + @abstractmethod + def deregister(self, source: Optional[Source], item: T): + """Deregisters an item using its specification.""" + + @abstractmethod + def deregister_identifier(self, identifier: str): + """Deregisters an item using its identifier.""" + + +T = TypeVar("T") + + +class Route(ABC, Generic[T]): + """A route is of a specific type and manages the registration from a + specific queue to a list of backends. + """ + + def __init__(self, route_config: RouteConfig) -> None: + self.route_config = route_config + + @property + def replace(self): + """Whether to allow replacement for this route""" + return self.route_config.replace + + @abstractmethod + def parse(self, raw: str) -> T: + """Parses the value from the given raw input.""" + + @abstractmethod + def get_source( + self, source_cfgs: List[SourceConfig], item: T + ) -> Optional[Source]: + """Determines the source of the given parsed item.""" diff --git a/registrar/backend/__init__.py b/registrar/backend/__init__.py index 644b061..42b568f 100644 --- a/registrar/backend/__init__.py +++ b/registrar/backend/__init__.py @@ -1,54 +1,8 @@ -from typing import List +"""Registrar backends. +""" -from ..utils import import_by_path -from .abc import ItemBackend, PathBackend +from .utils import get_backends -def get_backends(config: dict) -> List[ItemBackend]: - """Gets the backends for registering into - Args: - config (dict): registrar configuration - - Returns: - List[ItemBackend]: list of backends where registration will be - carried out - """ - backends: List[ItemBackend] = [] - for cfg_backend in config["backends"]: - # construct backend - backend_cls = import_by_path(cfg_backend["path"]) - - backends.append( - backend_cls( - *cfg_backend.get("args", []), - **cfg_backend.get("kwargs", {}), - ) - ) - - return backends - - -def get_path_backends(config: dict) -> List[PathBackend]: - """Gets the backends for registering the path into - - Args: - config (dict): registrar configuration - - Returns: - List[PathBackend]: list of backends where registration will be - carried out - """ - backends = [] - for cfg_backend in config.get("path_backends", []): - # construct backend - backend_cls = import_by_path(cfg_backend["path"]) - - backends.append( - backend_cls( - *cfg_backend.get("args", []), - **cfg_backend.get("kwargs", {}), - ) - ) - - return backends +__all__ = ["get_backends"] diff --git a/registrar/backend/abc.py b/registrar/backend/abc.py deleted file mode 100644 index 490ae17..0000000 --- a/registrar/backend/abc.py +++ /dev/null @@ -1,41 +0,0 @@ -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Optional - -if TYPE_CHECKING: - from pystac import Item - -from ..source import Source - - -class ItemBackend(ABC): - def __repr__(self) -> str: - return f"<{self.__class__.__name__}>" - - @abstractmethod - def item_exists(self, source: Optional[Source], item: "Item") -> bool: - ... - - @abstractmethod - def register_item(self, source: Optional[Source], item: "Item", replace: bool): - ... - - @abstractmethod - def deregister_identifier(self, identifier: str) -> Optional[str]: - ... - - -class PathBackend(ABC): - def __repr__(self) -> str: - return f"<{self.__class__.__name__}>" - - @abstractmethod - def path_exists(self, source: Optional[Source], path: str) -> bool: - ... - - @abstractmethod - def register_path(self, source: Optional[Source], path: str, replace: bool): - ... - - @abstractmethod - def deregister_path(self, source: Optional[Source], path: str) -> Optional[str]: - ... diff --git a/registrar/backend/eoxserver.py b/registrar/backend/eoxserver.py index 567d5c0..5af2fd0 100644 --- a/registrar/backend/eoxserver.py +++ b/registrar/backend/eoxserver.py @@ -7,12 +7,12 @@ Contains implementations for different backends where data may be registered """ import os import sys -import structlog from typing import List, TYPE_CHECKING, Optional, TypedDict, cast import json from urllib.parse import urlparse from uuid import uuid4 +import structlog import django from django.db import transaction @@ -23,7 +23,7 @@ if TYPE_CHECKING: from ..exceptions import RegistrationError from ..source import HTTPSource, Source, LocalSource, S3Source, SwiftSource -from .abc import ItemBackend +from ..abc import Backend logger = structlog.getLogger(__name__) @@ -60,7 +60,7 @@ class ItemToProductTypeMapping(TypedDict): collections: List[str] -class EOxServerBackend(ItemBackend): +class EOxServerBackend(Backend["Item"]): """ EOxServer backend allows registration to be performed on a running EOxServer instance @@ -95,7 +95,7 @@ class EOxServerBackend(ItemBackend): def __repr__(self) -> str: return f"<{self.__class__.__name__} instance_name ={self.instance_name}>" - def item_exists(self, source: Optional[Source], item: "Item") -> bool: + def exists(self, source: Optional[Source], item: "Item") -> bool: """Checks whether the item exists in the given source Args: diff --git a/registrar/backend/utils.py b/registrar/backend/utils.py new file mode 100644 index 0000000..8ceebfd --- /dev/null +++ b/registrar/backend/utils.py @@ -0,0 +1,33 @@ +"""Backend handling utilities +""" + +from typing import List + +from ..utils import import_by_path +from ..abc import Backend +from ..config import BackendConfig + + +def get_backends(backend_configs: List[BackendConfig]) -> List[Backend]: + """Gets the backends for registering into + + Args: + backend_configs (dict): backend configurations + + Returns: + List[Backend]: list of backends where registration will be + carried out + """ + backends: List[Backend] = [] + for cfg_backend in backend_configs: + # construct backend + backend_cls = import_by_path(cfg_backend.path) + + backends.append( + backend_cls( + *cfg_backend.args, + **cfg_backend.kwargs, + ) + ) + + return backends diff --git a/registrar/cli.py b/registrar/cli.py index 10f1e6b..6b7fb32 100644 --- a/registrar/cli.py +++ b/registrar/cli.py @@ -1,25 +1,19 @@ -from os.path import join, dirname +""" Command line interfaces for the registrar +""" + import logging.config import click import structlog import structlog.contextvars -import yaml -import jsonschema - -from .registrar import ( - register_item, - register_path, - deregister_item, - deregister_identifier, - deregister_path, -) + +from . import registrar from .daemon import run_daemon -from .config import load_config -from .utils import import_by_path +from .config import RegistrarConfig def setup_logging(debug=False): + """Sets up the logging configuration""" logging.config.dictConfig( { "version": 1, @@ -66,130 +60,106 @@ def setup_logging(debug=False): ) -def validate_config(config): - with open(join(dirname(__file__), "config-schema.yaml")) as f: - schema = yaml.load(f) - - jsonschema.validate(config, schema) - - @click.group() -def cli(): - pass - - -@cli.command(help="Run the registrar daemon, attaching to a Redis queue") @click.option("--config-file", type=click.File("r")) @click.option("--validate", is_flag=True) -@click.option("--replace", is_flag=True) @click.option("--host", type=str) @click.option("--port", type=int) -@click.option("--register-queue", "--listen-queue", type=str) -@click.option("--register-path-queue", "--listen-path-queue", type=str) -@click.option("--deregister-queue", type=str) -@click.option("--deregister-path-queue", type=str) -@click.option("-e", "--extra", type=str, multiple=True, default=[]) @click.option("--debug", is_flag=True) -def daemon( - config_file=None, - validate=False, - replace=False, - host=None, - port=None, - register_queue=None, - register_path_queue=None, - deregister_queue=None, - deregister_path_queue=None, - extra=None, - debug=False, +@click.pass_context +def cli( + ctx, config_file=None, validate=False, host=None, port=None, debug=False ): + """Entry point for the registar""" + setup_logging(debug) + # ensure that ctx.obj exists and is a dict (in case `cli()` is called + # by means other than the `if` block below) + ctx.ensure_object(dict) + + config = RegistrarConfig.from_file(config_file, validate) + if host: + config.redis_host = host + if port: + config.redis_port = port + + ctx.obj["CONFIG"] = config + + +@cli.command(help="Run the registrar daemon, attaching to a Redis queue") +@click.option("--replace/--no-replace", default=None) +@click.pass_context +def daemon(ctx, replace=False): """ Run the registrar daemon to listen on the given queues and execute the (de-)registrations commands. Examples: \b - registrar daemon --config-file config.yaml \ - --validate --replace \ - --host redis --port 6379 \ - --listen-queue register \ - --deregister-queue deregister \ - --extra some_queue=path.to.handler \ - --extra some_other_queue=path.to.another.handler \ - --debug + registrar \ + --config-file config.yaml \ + --validate \ + --host redis \ + --port 6379 \ + --debug \ + daemon --replace """ - setup_logging(debug) - config = load_config(config_file) - if validate: - validate_config(config) - - handlers = {} - if register_queue: - handlers[register_queue] = register_item - - if register_path_queue: - handlers[register_path_queue] = register_path - - if deregister_queue: - handlers[deregister_queue] = deregister_item - - if deregister_path_queue: - handlers[deregister_path_queue] = deregister_path - - for extra_handler in extra: - queue, _, path = extra_handler.partition("=") - handler = import_by_path(path.strip()) - handlers[queue.strip()] = handler - - run_daemon( - config, - replace, - host, - port, - handlers, - ) + + config: RegistrarConfig = ctx["CONFIG"] + + if replace is not None: + for route in config.routes.values(): + route.replace = replace + + run_daemon(config) @cli.command(help="Run a single, one-off registration") +@click.argument("route_name", type=str) @click.argument("item", type=str) -@click.option("--config-file", type=click.File("r")) -@click.option("--is-path", is_flag=True) -@click.option("--validate", is_flag=True) @click.option("--replace", is_flag=True) -@click.option("--debug", is_flag=True) -def register( - item, is_path=False, config_file=None, validate=False, replace=False, debug=False -): - setup_logging(debug) - config = load_config(config_file) - if validate: - validate_config(config) +@click.pass_context +def register(ctx, route_name, item, replace): + """ Registers a single item. + Examples: + + \b + registrar \ + --config-file config.yaml \ + --validate \ + --host redis \ + --port 6379 \ + --debug \ + register --replace myroute "{...}" + """ + config: RegistrarConfig = ctx["CONFIG"] + if replace is not None: + for route in config.routes.values(): + route.replace = replace - if is_path: - register_path(config, item, replace) - else: - register_item(config, item, replace) + registrar.register(config.routes[route_name], item) @cli.command(help="Run a single, one-off de-registration") -@click.argument("--path", type=str) -@click.argument("--identifier", type=str) -@click.option("--config-file", type=click.File("r")) -@click.option("--validate", is_flag=True) -@click.option("--debug", is_flag=True) -def deregister( - file_path=None, identifier=None, config_file=None, validate=False, debug=False -): - setup_logging(debug) - config = load_config(config_file) - if validate: - validate_config(config) +@click.argument("route_name", type=str) +@click.argument("item", type=str) +@click.option("--identifier", is_flag=True) +@click.pass_context +def deregister(ctx, route_name, item, identifier): + """ Registers a single item. + Examples: - if file_path: - deregister_path(config, file_path) - elif identifier: - deregister_identifier(config, identifier) + \b + registrar \ + --config-file config.yaml \ + --validate \ + --host redis \ + --port 6379 \ + --debug \ + deregister --identifier myroute "someid" + """ + config: RegistrarConfig = ctx["CONFIG"] + registrar.deregister(config.routes[route_name], item, use_id=identifier) if __name__ == "__main__": - cli() + cli(obj={}) diff --git a/registrar/config.py b/registrar/config.py index 4c01301..c215bef 100644 --- a/registrar/config.py +++ b/registrar/config.py @@ -6,11 +6,15 @@ Contains configuration functions for the registrar """ +from dataclasses import dataclass, field +from enum import Enum import os -from typing import TextIO +from typing import Any, Dict, List, Optional, TextIO import re +from os.path import join, dirname import yaml +import jsonschema ENV_PATTERN = re.compile(r".*?\${(\w+)}.*?") @@ -32,12 +36,12 @@ def constructor_env_variables(loader: yaml.Loader, node): match = ENV_PATTERN.findall(value) # to find all env variables in line if match: full_value = value - for g in match: - env_variable = os.environ.get( - g, - ) + for group in match: + env_variable = os.environ.get(group) if env_variable is not None: - full_value = full_value.replace(f"${{{g}}}", env_variable) # type: ignore # noqa: E501 + full_value = full_value.replace( + f"${{{group}}}", env_variable + ) # type: ignore # noqa: E501 else: return None return full_value @@ -62,3 +66,193 @@ def load_config(input_file: TextIO) -> dict: loader.add_constructor(tag, constructor_env_variables) return yaml.load(input_file, Loader=loader) + + +def validate_config(config): + """Validate against the config-schema.yaml""" + with open(join(dirname(__file__), "config-schema.yaml")) as f: + schema = yaml.load(f) + + jsonschema.validate(config, schema) + + +@dataclass(eq=True) +class BackendConfig: + """Configuration for a specific backend. + + Attributes: + path (str): the dotpath to the implementing class + args (dict): any arguments to be passed to the class when + instantiating it + kwargs (dict): any keyword arguments to be passed to the class when + instantiating it + """ + + path: str + args: List[Any] = field(default_factory=list) + kwargs: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(eq=True) +class HandlerConfig: + """A handler configuration + + Attributes: + path (str): the dotpath of the handler class + args (List[Any]): any handler arguments + kwargs (dict): any handler keyword-arguments + """ + + path: str + args: List[Any] = field(default_factory=list) + kwargs: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(eq=True) +class HandlersConfig: + """Pre-/success-/error-handlers for a given route. + + Attributes: + pre (List[HandlerConfig]): the pre-handlers configuration + post (List[HandlerConfig]): the post-handlers configuration + error (List[HandlerConfig]): the error-handlers configuration + """ + + pre: List[HandlerConfig] = None + post: List[HandlerConfig] = None + error: List[HandlerConfig] = None + + @classmethod + def from_dict(cls, values: dict) -> "HandlersConfig": + """Constructs a handler config from a dict""" + return cls( + pre=[ + HandlerConfig(**handler_values) + for handler_values in values.pop("pre", []) + ], + post=[ + HandlerConfig(**handler_values) + for handler_values in values.pop("post", []) + ], + error=[ + HandlerConfig(**handler_values) + for handler_values in values.pop("error", []) + ], + ) + + +class RouteMode(Enum): + """The route mode.""" + + REGISTER = "REGISTER" + DEREGISTER = "DEREGISTER" + DEREGISTER_IDENTIFIER = "DEREGISTER_IDENTIFIER" + + +@dataclass(eq=True) +class RouteConfig: + """A registration route configuration + + Attributes: + path (str): the dotpath to the implementing class + queue (str): the queue for this route to listen on + output_queue (str, optional): the queue to put successfully registered + items on + backends (List[BackendConfig]): all associated backends for that route + handlers (HandlerConfig): + """ + + path: str + queue: str + default_mode: RouteMode = RouteMode.REGISTER + output_queue: Optional[str] = None + replace: bool = False + backends: List[BackendConfig] = field(default_factory=list) + handlers: HandlersConfig = field(default_factory=HandlerConfig) + args: List[Any] = field(default_factory=list) + kwargs: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, values: dict) -> "RouteConfig": + """Parses a RouteConfig from a dictionary""" + return cls( + handlers=HandlersConfig.from_dict(values.pop("handlers", {})), + backends=[ + BackendConfig(**backend_cfg) + for backend_cfg in values.pop("backends") + ], + default_mode=RouteMode( + values.pop("default_mode", "REGISTER").upper() + ), + **values, + ) + + +@dataclass(eq=True) +class SourceConfig: + """The configuration of a specific remote source. + + Attributes: + type (str): the type name of the source + name (str): the given name of the source + filter (str, optional): a filter to see if a path is relative to this + source + args (list): additional options for the implementing source class + constructor + kwargs (dict): additional options for the implementing source class + constructor + """ + + type: str + name: str + filter: Optional[str] + args: List[Any] = field(default_factory=list) + kwargs: Dict[str, str] = field(default_factory=dict) + + +@dataclass(eq=True) +class RegistrarConfig: + """The root registration configuration object. + + Attributes: + routes (Dict[str, RouteConfig]): all routes + sources (List[SourceConfig]): the sources + """ + + routes: Dict[str, RouteConfig] + sources: List[SourceConfig] + redis_host: Optional[str] = "redis" + redis_port: Optional[int] = 6379 + + def get_route(self, queue: str) -> RouteConfig: + """Returns the RouteConfig for the given queue name""" + for route in self.routes.values(): + if route.queue == queue: + return route + raise KeyError(queue) + + @classmethod + def from_file( + cls, input_file: TextIO, validate: bool = False + ) -> "RegistrarConfig": + """Parses a RegistrarConfig from a file""" + return cls.from_dict(load_config(input_file), validate) + + @classmethod + def from_dict( + cls, values: dict, validate: bool = False + ) -> "RegistrarConfig": + """Parses a RegistrarConfig from a dictionary""" + if validate: + validate_config(values) + return cls( + { + name: RouteConfig.from_dict(route_cfg) + for name, route_cfg in values.pop("routes").items() + }, + [ + SourceConfig(**source_cfg) + for source_cfg in values.pop("sources", []) + ], + **values, + ) diff --git a/registrar/config.yaml b/registrar/config.yaml new file mode 100644 index 0000000..363ec17 --- /dev/null +++ b/registrar/config.yaml @@ -0,0 +1,40 @@ +routes: + items: + type: STAC-Item + queue: register + replace: true + backends: + - path: registrar.backend.eoxserver.EOxServerItemBackend + kwargs: + + handlers: + pre: + post: + error: + + collections: + type: STAC-Collection + queue: register-collections + backends: + - path: registrar.backend.eoxserver.EOxServerCollectionBackend + kwargs: + + handlers: + pre: + post: + error: + + path: + type: path + queue: register-paths + backends: + - path: some.path.backend + kwargs: + + handlers: + pre: + post: + error: + +sources: +... \ No newline at end of file diff --git a/registrar/daemon.py b/registrar/daemon.py index 5db45a0..130344e 100644 --- a/registrar/daemon.py +++ b/registrar/daemon.py @@ -6,44 +6,37 @@ Contains the daemon functions for the registrar """ -import structlog -from typing import Callable, Dict +from typing import Dict +import structlog import redis +from .registrar import register, deregister +from .config import RegistrarConfig, RouteConfig, RouteMode -logger = structlog.getLogger(__name__) - - -# callback declaration: callable with 'config', item and replace -QueueItemHandler = Callable[[dict, str, bool], None] +logger = structlog.getLogger(__name__) -def run_daemon( - config: dict, - replace: bool, - host: str, - port: int, - handlers: Dict[str, QueueItemHandler], -): - """Run the registrar daemon, listening on a redis queue - for items to be (de-)registered. - Args: - config (dict): configuration for the registrar - replace (bool): replace the files - host (str): redis host url - port (int): redis port - handlers (Dict[str, QueueItemCallback]): a queue mapping dictionary, - mapping the queue names to their respective handling callables +def run_daemon(config: RegistrarConfig): + """Run the registration daemon - Raises: - Exception: Raised if path is already registered + Arguments: + config (RegistarConfig): the root configuration. """ + client = redis.Redis( + host=config.redis_host, + port=config.redis_port, + charset="utf-8", + decode_responses=True + ) + + queue_to_route: Dict[str, RouteConfig] = { + route_cfg.queue: route_cfg + for route_cfg in config.routes.values() + } - # initialize the queue client - client = redis.Redis(host=host, port=port, charset="utf-8", decode_responses=True) - queue_names = list(handlers.keys()) + queue_names = list(queue_to_route.keys()) logger.debug( f"waiting on items on redis " f"queue{'s' if len(queue_names) > 1 else ''}", @@ -55,7 +48,12 @@ def run_daemon( queue, value = client.brpop(queue_names) try: - handler = handlers[queue] - handler(config, value, replace) - except Exception as e: - logger.exception(e) + route_cfg = queue_to_route[queue] + if route_cfg.default_mode == RouteMode.REGISTER: + register(route_cfg, value) + elif route_cfg.default_mode == RouteMode.DEREGISTER: + deregister(route_cfg, value) + if route_cfg.default_mode == RouteMode.DEREGISTER_IDENTIFIER: + deregister(route_cfg, value, use_id=True) + except Exception as exc: + logger.exception(exc) diff --git a/registrar/exceptions.py b/registrar/exceptions.py index 1753c18..1878f8f 100644 --- a/registrar/exceptions.py +++ b/registrar/exceptions.py @@ -8,4 +8,5 @@ Contains exceptions that may occur during registration class RegistrationError(Exception): - pass + """Registration error class + """ diff --git a/registrar/json.py b/registrar/json.py index 8520b64..0068151 100644 --- a/registrar/json.py +++ b/registrar/json.py @@ -6,8 +6,8 @@ Contains helper functions for reading json using concepts from the registrar """ from os import remove -import json from os.path import join, basename +import json from tempfile import gettempdir from typing import Union, Dict, List diff --git a/registrar/registrar.py b/registrar/registrar.py index 3c94ba8..c9263a7 100644 --- a/registrar/registrar.py +++ b/registrar/registrar.py @@ -1,223 +1,135 @@ +"""Contains all functions relevant for registration """ -registrar.py -============= - -Contains all functions relevant for registration - -""" -from typing import Optional -import json +from typing import Callable, List import structlog import structlog.contextvars -from pystac import Item -from .source import get_source, get_source_from_path -from .backend import get_backends, get_path_backends -from .exceptions import RegistrationError +from .abc import Route +from .config import HandlerConfig, HandlersConfig, RouteConfig +from .backend import get_backends from .utils import import_by_path +from .exceptions import RegistrationError logger = structlog.getLogger(__name__) -def register_item(config: dict, value: str, replace: bool = False): - """Handle registration of a stac item - - Args: - config (dict): configuration of the registrar - value (str): encoded STAC item to register - replace (bool, optional): replace the file in the endpoint or not. - Defaults to False. +def register(route_cfg: RouteConfig, value: str): + """Handles the registration of a specific item - Raises: - RegistrationError: Raised if item is already registered and replace is - not turned on + Arguments: + route_cfg (RouteConfig): the used route configuration + value (str): the raw value to be parsed """ - item = Item.from_dict(json.loads(value)) - with structlog.contextvars.bound_contextvars(item=item): - logger.info("Handling item") + route_cls = import_by_path(route_cfg.path) + route: Route = route_cls(*route_cfg.args, **route_cfg.kwargs) - source = get_source(config, item) + # parse the actual item to be processed + item = route.parse(value) + # determine the source to be used with that item + source = route.get_source(item) - for pre_handler in get_pre_handlers(config): - pre_handler(config, item) + replaced = False + with structlog.contextvars.bound_contextvars(item=item): + for pre_handler in get_pre_handlers(route_cfg.handlers): + pre_handler(route_cfg, item) try: - for backend in get_backends(config): - if backend.item_exists(source, item): - if replace: + for backend in get_backends(route_cfg.backends): + if backend.exists(source, item): + if route.replace: logger.info("Replacing item") - backend.register_item(source, item, replace=True) + replaced = True + backend.register(source, item, replace=True) else: - raise RegistrationError(f"Object {item!r} is already registered") + raise RegistrationError( + f"Item {item!r} is already registered" + ) else: logger.info("Registering item") - backend.register_item(source, item, replace=False) - except Exception as e: - for error_handler in get_error_handlers(config): - error_handler(config, item, e) + backend.register(source, item, replace=False) + except Exception as exc: + for error_handler in get_error_handlers(route_cfg.handlers): + error_handler(route_cfg, item, exc) raise else: - for post_handler in get_post_handlers(config): - post_handler(config, item) + for post_handler in get_post_handlers(route_cfg.handlers): + post_handler(route_cfg, item) logger.info( - f"Successfully {'replaced' if replace else 'registered'}" + f"Successfully {'replaced' if replaced else 'registered'}" ) -def register_path(config: dict, path: str, replace: bool = False): - """Handle registration of a path - - Args: - config (dict): configuration of the registrar - path (str): path to register - replace (bool, optional): replace the file in the endpoint or not. - Defaults to False. +def deregister(route_cfg: RouteConfig, value: str, use_id: bool = False): + """Handles the deregistration of a specific item. - Raises: - RegistrationError: Raised if file is already registered and replace is - not turned on + Arguments: + route_cfg (RouteConfig): the used route configuration + value (str): the raw value to be parsed or the identifier if ``use_id`` + is used. + use_id (bool): to deregister using the identifier, or a parsed item. """ + route_cls = import_by_path(route_cfg.path) + route: Route = route_cls(route_cfg) - source = get_source_from_path(config, path) - logger.info(f"Handling '{path!r}'.") - - for pre_handler in get_path_pre_handlers(config): - pre_handler(config, path) - - try: - for backend in get_path_backends(config): - if backend.path_exists(source, path): - if replace: - logger.info(f"Replacing '{path!r}'.") - backend.register_path(source, path, replace=True) - else: - raise RegistrationError(f"Path {path!r} is already registered") - else: - logger.info(f"Registering '{path!r}'.") - backend.register_path(source, path, replace=False) - except Exception as e: - for error_handler in get_path_error_handlers(config): - error_handler(config, path, e) - raise - else: - for post_handler in get_path_post_handlers(config): - post_handler(config, path) - - logger.info( - f"Successfully {'replaced' if replace else 'registered'} " f"'{path!r}'" - ) - - -def deregister_item(config: dict, value: str, _: bool = False): - item = Item.from_dict(json.loads(value)) - logger.info(f"Handling deregistration for '{item!r}'.") - return _deregister_identifier(config, item.id, item) - - -def deregister_identifier(config, identifier: str, _: bool = False): - logger.info(f"Handling deregistration of identifier '{identifier}'.") - return _deregister_identifier(config, identifier) - - -def _deregister_identifier(config, identifier: str, item: Optional[Item] = None): - - for pre_handler in get_deregister_pre_handlers(config): - pre_handler(config, identifier, item) - - try: - for backend in get_backends(config): - backend.deregister_identifier(identifier) - except Exception as e: - for error_handler in get_deregister_error_handlers(config): - error_handler(config, identifier, item, e) - raise + # parse the actual item to be processed + if use_id: + item = value + logger.info(f"Handling deregistration of identifier '{item}'.") else: - for post_handler in get_deregister_post_handlers(config): - post_handler(config, identifier, item) - - logger.info(f"Successfully deregistered '{identifier}'") - + item = route.parse(value) + logger.info(f"Handling deregistration for '{item!r}'.") -def deregister_path(config, path: str, _: bool = False): - source = get_source_from_path(config, path) - logger.info(f"Deregistering '{path!r}'.") + # determine the source to be used with that item + source = route.determine_source(item) - for pre_handler in get_deregister_path_pre_handlers(config): - pre_handler(config, path) + with structlog.contextvars.bound_contextvars(item=item): + for pre_handler in get_pre_handlers(route_cfg.handlers): + pre_handler(route_cfg, item) - try: - for backend in get_path_backends(config): - backend.deregister_path(source, path) - except Exception as e: - for error_handler in get_deregister_path_error_handlers(config): - error_handler(config, path, e) - raise - else: - for post_handler in get_deregister_path_post_handlers(config): - post_handler(config, path) + try: + for backend in get_backends(route_cfg.backends): + if use_id: + backend.deregister_identifier(item) + else: + backend.deregister(source, item) + except Exception as exc: + for error_handler in get_error_handlers(route_cfg.handlers): + error_handler(route_cfg, item, exc) + raise + else: + for post_handler in get_post_handlers(route_cfg.handlers): + post_handler(route_cfg, item) - logger.info(f"Successfully deregistered '{path}'") + logger.info("Successfully deregistered") -def _get_handlers(config, name): - handlers = [] - for handler_def in config.get(name, []): - handler_cls = import_by_path(handler_def["path"]) - handlers.append( - handler_cls( - *handler_def.get("args", []), - **handler_def.get("kwargs", {}), - ) +def _instantiate_handlers(handler_configs: List[HandlerConfig]): + """Helper to get an arbitrary handler + """ + return [ + import_by_path(handler_config.path)( + *handler_config.args, + **handler_config.kwargs, ) - - return handlers - - -def get_pre_handlers(config): - return _get_handlers(config, "pre_handlers") - - -def get_post_handlers(config): - return _get_handlers(config, "post_handlers") - - -def get_error_handlers(config): - return _get_handlers(config, "error_handlers") - - -def get_path_pre_handlers(config): - return _get_handlers(config, "path_pre_handlers") + for handler_config in handler_configs + ] -def get_path_post_handlers(config): - return _get_handlers(config, "path_post_handlers") - - -def get_path_error_handlers(config): - return _get_handlers(config, "path_error_handlers") - - -def get_deregister_pre_handlers(config): - return _get_handlers(config, "deregister_pre_handlers") - - -def get_deregister_post_handlers(config): - return _get_handlers(config, "deregister_post_handlers") - - -def get_deregister_error_handlers(config): - return _get_handlers(config, "deregister_error_handlers") - - -def get_deregister_path_pre_handlers(config): - return _get_handlers(config, "deregister_path_pre_handlers") +def get_pre_handlers(config: HandlersConfig) -> List[Callable]: + """Instantiates pre error handlers. + """ + return _instantiate_handlers(config.pre) -def get_deregister_path_post_handlers(config): - return _get_handlers(config, "deregister_path_post_handlers") +def get_post_handlers(config: HandlersConfig) -> List[Callable]: + """Instantiates post error handlers. + """ + return _instantiate_handlers(config.post) -def get_deregister_path_error_handlers(config): - return _get_handlers(config, "deregister_path_error_handlers") +def get_error_handlers(config: HandlersConfig) -> List[Callable]: + """Instantiates error error handlers. + """ + return _instantiate_handlers(config.error) diff --git a/registrar/route/__init__.py b/registrar/route/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/registrar/route/json.py b/registrar/route/json.py new file mode 100644 index 0000000..506d4a9 --- /dev/null +++ b/registrar/route/json.py @@ -0,0 +1,32 @@ +import json +from typing import List, Union + +from pyparsing import Optional + + +from ..abc import Route +from ..config import RouteConfig, SourceConfig +from ..source import Source, get_source + + +JSONType = Union[dict, list] + + +class JSONRoute(Route[JSONType]): + """Route handler that handles raw JSON objects""" + + def __init__(self, config: RouteConfig, href_field: Optional[str] = None): + super().__init__(config) + self.href_field = href_field + + def parse(self, raw: str) -> JSONType: + return json.loads(raw) + + def get_source( + self, source_cfgs: List[SourceConfig], item: JSONType + ) -> Optional[Source]: + if self.href_field: + path = item[self.href_field] + return get_source(source_cfgs, [path]) + else: + return None diff --git a/registrar/route/path.py b/registrar/route/path.py new file mode 100644 index 0000000..77b6e6b --- /dev/null +++ b/registrar/route/path.py @@ -0,0 +1,24 @@ +"""Module holding route classes dealing with POSIX paths +""" + +from typing import List + +from pyparsing import Optional + +from ..abc import Route +from ..config import SourceConfig +from ..source import Source, get_source + + +class PathRoute(Route[str]): + """Route handler that manages the registration of values interpreted as + file paths + """ + + def parse(self, raw: str) -> str: + return raw + + def get_source( + self, source_cfgs: List[SourceConfig], item: str + ) -> Optional[Source]: + return get_source(source_cfgs, [item]) diff --git a/registrar/route/stac.py b/registrar/route/stac.py new file mode 100644 index 0000000..9959fda --- /dev/null +++ b/registrar/route/stac.py @@ -0,0 +1,39 @@ +"""Module holding classes dealing with registration routes of STAC objects +""" + +import json +from typing import List, Optional + +import pystac + +from ..abc import Route +from ..config import SourceConfig +from ..source import Source, get_source + + +class ItemRoute(Route[pystac.Item]): + """A route implementation for STAC Items""" + + def parse(self, raw: str) -> pystac.Item: + return pystac.Item.from_dict(json.loads(raw)) + + def get_source( + self, source_cfgs: List[SourceConfig], item: pystac.Item + ) -> Optional[Source]: + return get_source( + source_cfgs, [asset.href for asset in item.assets] + ) + + +class CollectionRoute(Route[pystac.Collection]): + """A route implementation for STAC Collections""" + + def parse(self, raw: str) -> pystac.Collection: + return pystac.Collection.from_dict(json.loads(raw)) + + def get_source( + self, source_cfgs: List[SourceConfig], item: pystac.Collection + ) -> Optional[Source]: + return get_source( + source_cfgs, [asset.href for asset in item.assets] + ) diff --git a/registrar/source.py b/registrar/source.py index e31e9b9..8b990f6 100644 --- a/registrar/source.py +++ b/registrar/source.py @@ -2,8 +2,8 @@ source.py ========== -Contains different file source concepts where the data may be stored such as local storage -, S3, swift +Contains different file source concepts where the data may be stored such as +local storage, S3, swift """ import re @@ -11,12 +11,12 @@ from os.path import normpath, join, isabs import shutil from glob import glob from fnmatch import fnmatch +from typing import List, Optional +from abc import ABC, abstractmethod +from urllib.parse import urljoin, urlparse import requests import structlog -from typing import TYPE_CHECKING, Optional -from abc import ABC, abstractmethod -from urllib.parse import urljoin, urlparse import boto3 import boto3.session @@ -24,21 +24,18 @@ import botocore.session import botocore.handlers from botocore import UNSIGNED from botocore.config import Config - -if TYPE_CHECKING: - from pystac import Item - from swiftclient.service import SwiftService - -logger = structlog.getLogger(__name__) +from .config import SourceConfig -class RegistrationError(Exception): - pass +logger = structlog.getLogger(__name__) class Source(ABC): + """Abstract base class for all sources + """ + def __init__(self, name: str = None, endpoint_url: str = ""): self.name = name self.endpoint_url = endpoint_url @@ -48,19 +45,23 @@ class Source(ABC): @abstractmethod def get_container_and_path(self, path: str): - ... + """Split the input path into a container and a path part + """ @abstractmethod - def list_files(self, path: str, glob_pattern: list = None): - ... + def list_files(self, path: str, glob_patterns: list = None): + """Return a list of file references for the given base path and glob + pattern + """ @abstractmethod def get_file(self, path: str, target_path: str): - ... + """Download the given file to the target location""" @abstractmethod def get_vsi_env_and_path(self, path: str): - ... + """Get a VSI conformant path. + See https://gdal.org/user/virtual_file_systems.html""" class SwiftSource(Source): @@ -122,6 +123,8 @@ class SwiftSource(Source): self.container = container def get_service(self): + """Returns the swiftclient.SwiftService for the options. + """ return SwiftService( options={ "os_username": self.username, @@ -192,7 +195,7 @@ class SwiftSource(Source): for result in results: if not result["success"]: - raise Exception("Failed to download %s" % path) + raise Exception(f"Failed to download {path}") def get_vsi_env_and_path(self, path: str): container, path = self.get_container_and_path(path) @@ -217,10 +220,10 @@ class S3Source(Source): secret_access_key (str, optional): secret access key. Defaults to None. access_key_id (str, optional): access key identifier. Defaults to None. endpoint_url (str, optional): endpoint url. Defaults to None. - strip_bucket (bool, optional): whether to strip bucket name when constructing - paths. Defaults to True. - validate_bucket_name (bool, optional): whether to validate the name of bucket. - Defaults to True. + strip_bucket (bool, optional): whether to strip bucket name when + constructing paths. Defaults to True. + validate_bucket_name (bool, optional): whether to validate the name of + bucket. Defaults to True. region_name (str, optional): name of aws s3 region. Defaults to None. public (bool, optional): whether the data is public or not. Defaults to False. @@ -241,7 +244,8 @@ class S3Source(Source): ): super().__init__(name, endpoint_url) - # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/ + # see + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/ # session.html#boto3.session.Session.client # for client_kwargs self.bucket_name = bucket_name @@ -254,7 +258,8 @@ class S3Source(Source): botocore_session = botocore.session.Session() if not validate_bucket_name: botocore_session.unregister( - "before-parameter-build.s3", botocore.handlers.validate_bucket_name + "before-parameter-build.s3", + botocore.handlers.validate_bucket_name, ) session = boto3.session.Session(botocore_session=botocore_session) @@ -324,12 +329,16 @@ class S3Source(Source): def get_vsi_env_and_path(self, path: str, streaming: bool = False): bucket, key = self.get_container_and_path(path) # parsed = urlparse(self.endpoint_url) - return { - "AWS_SECRET_ACCESS_KEY": self.secret_access_key, - "AWS_ACCESS_KEY_ID": self.access_key_id, - "AWS_S3_ENDPOINT": self.endpoint_url, - "AWS_NO_SIGN_REQUEST": "YES" if self.public else "NO", - }, f'/{"vsis3" if not streaming else "vsis3_streaming"}/{bucket}/{key}' + protocol = "vsis3" if not streaming else "vsis3_streaming" + return ( + { + "AWS_SECRET_ACCESS_KEY": self.secret_access_key, + "AWS_ACCESS_KEY_ID": self.access_key_id, + "AWS_S3_ENDPOINT": self.endpoint_url, + "AWS_NO_SIGN_REQUEST": "YES" if self.public else "NO", + }, + f'/{protocol}/{bucket}/{key}', + ) class LocalSource(Source): @@ -360,7 +369,7 @@ class LocalSource(Source): glob_patterns = [glob_patterns] if glob_patterns is not None: - return glob(join(self._join_path(path), glob_patterns[0])) # TODO + return glob(join(self._join_path(path), glob_patterns[0])) else: return glob(join(self._join_path(path), "*")) @@ -372,10 +381,12 @@ class LocalSource(Source): class HTTPSource(Source): + """Source class for HTTP locations + """ def get_container_and_path(self, path: str): return (self.endpoint_url, path) - def list_files(self, path: str, glob_pattern: list = None): + def list_files(self, path: str, glob_patterns: list = None): raise NotImplementedError() def get_file(self, path: str, target_path: str): @@ -396,68 +407,30 @@ SOURCE_TYPES = { } -def get_source(config: dict, item: "Item") -> Source: - """Retrieves the source using the configuration and stac item assets - - Args: - config (dict): registrar configuration - item (Item): STAC item to ofbe registered - - Raises: - RegistrationError: raised if no suitable source is found - - Returns: - Source: source of the data - """ - cfg_sources = config["sources"] - - assets = item.assets - hrefs = [] - for asset in assets.values(): - hrefs.append(asset.href) - - for cfg_source in cfg_sources: - if cfg_source.get("filter"): - f = cfg_source.get("filter") - if any(filter(lambda item: re.match(f, item), hrefs)): - break - else: - break - else: - # no source found - raise RegistrationError(f"Could not find a suitable source for {item!r}") - - return SOURCE_TYPES[cfg_source["type"]]( - cfg_source["name"], *cfg_source.get("args", []), **cfg_source.get("kwargs", {}) - ) - - -def get_source_from_path(config, path: str) -> Optional[Source]: - """Retrieves the source using the configuration and a path - - Args: - config (dict): registrar configuration - path (str): path to be registered +def get_source( + source_cfgs: List[SourceConfig], hrefs: List[str] +) -> Optional[Source]: + """Retrieves a Source from a given list of SourceConfigs and a list of + hrefs to test against. - Raises: - RegistrationError: raised if no suitable source is found + Arguments: + source_cfgs (List[SourceConfig]): the source configs to test + hrefs (List[str]): the hrefs to test the sources against Returns: - Source: source of the data + Source: the constructed source from the tested source configuration """ - cfg_sources = config["sources"] - for cfg_source in cfg_sources: - if cfg_source.get("filter"): - f = cfg_source.get("filter") - if any(filter(lambda item: re.match(f, item), path)): + for source_cfg in source_cfgs: + if source_cfg.filter: + filter_ = source_cfg.filter + if any(filter(lambda item: re.match(filter_, item), hrefs)): break else: break else: - # no source found - raise RegistrationError(f"Could not find a suitable source for {path!r}") + return None - return SOURCE_TYPES[cfg_source["type"]]( - cfg_source["name"], *cfg_source.get("args", []), **cfg_source.get("kwargs", {}) + return SOURCE_TYPES[source_cfg.type]( + source_cfg.name, *source_cfg.args, **source_cfg.kwargs ) diff --git a/registrar/xml.py b/registrar/xml.py index 52351e7..54b9485 100644 --- a/registrar/xml.py +++ b/registrar/xml.py @@ -3,8 +3,8 @@ from os.path import join, basename from tempfile import gettempdir from dataclasses import dataclass, field from typing import Optional, Callable, Any -import structlog +import structlog import lxml.etree from .source import Source -- GitLab