EOX GitLab Instance

Commit 761b40a2 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Merge branch 'main' into restructure-initialization

parents fdac5c73 6ea2d918
Pipeline #19521 passed with stage
in 43 seconds
...@@ -10,8 +10,7 @@ from .endpoint import get_endpoint ...@@ -10,8 +10,7 @@ from .endpoint import get_endpoint
from .source import get_source from .source import get_source
from .exceptions import HarvestError from .exceptions import HarvestError
from .utils import cql_filter from .utils import cql_filter
from .postprocess import get_postprocessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -63,6 +62,13 @@ def main(config: dict, item: Union[dict, str], client: Redis): ...@@ -63,6 +62,13 @@ def main(config: dict, item: Union[dict, str], client: Redis):
# Perform harvest # Perform harvest
result = resource.harvest() result = resource.harvest()
if "postprocess" in harvest_config:
postprocessor = get_postprocessor(harvest_config["postprocess"])
result = (
postprocessor.postprocess(item)
for item in result
)
# Filter data # Filter data
if "filter" in harvest_config: if "filter" in harvest_config:
result = cql_filter(harvest_config["filter"], result) result = cql_filter(harvest_config["filter"], result)
......
from abc import ABC, abstractmethod
from typing import Dict, Type
class Postprocessor(ABC):
def __init__(self, **kwargs):
...
@abstractmethod
def postprocess(self, item: dict) -> dict:
pass
POSTPROCESSORS: Dict[str, Type[Postprocessor]] = {
}
def get_postprocessor(config: dict) -> Postprocessor:
cls = POSTPROCESSORS[config.pop("type")]
return cls(**config)
...@@ -114,23 +114,31 @@ class S3Source(Source): ...@@ -114,23 +114,31 @@ class S3Source(Source):
return item return item
class S3CatalogSource(S3Base, Source): class S3CatalogSource(S3Base):
type = "S3Catalog" type = "S3Catalog"
def __init__(self, root_href: str, **kwargs): def __init__(self, parameters: dict, **kwargs):
super().__init__(**kwargs) self.root_href = parameters.pop("root_href")
self.root_href = root_href self.default_catalog_name = parameters.pop("default_catalog_name", None)
super().__init__(**parameters)
def harvest(self) -> Iterator[dict]: def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 Catalog harvesting") logger.info("Starting S3 Catalog harvesting")
parsed = urlparse(self.root_href) parsed = urlparse(self.root_href)
yield from self.harvest_catalog(parsed.netloc, parsed.path) path = parsed.path
if path.startswith("/"):
path = parsed.path[1:]
if path.endswith("/") and self.default_catalog_name:
path = join(path, self.default_catalog_name)
yield from self.harvest_catalog(parsed.netloc, path)
def fetch_json(self, bucket: str, key: str) -> dict: def fetch_json(self, bucket: str, key: str) -> dict:
""" """
Loads the given object identifier by bucket and key and loads it as Loads the given object identifier by bucket and key and loads it as
JSON. JSON.
""" """
if key.startswith("/"):
key = key[1:]
response = self.client.get_object(Bucket=bucket, Key=key) response = self.client.get_object(Bucket=bucket, Key=key)
return json.load(response["Body"]) return json.load(response["Body"])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment