diff --git a/config-sample.yaml b/config-sample.yaml index b3261ab44c655c7df2fdef7558e90d8d83f0ebfb..b54ef38ab936ff0882cd94b4c80f640e1cc8f1a0 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -17,3 +17,17 @@ harvesters: - - P5D - !now queue: register # register, ingest, delete, update, preprocess + + - name: MyS3STACCatalogHarvester + type: STACCatalog + source: + type: S3 + bucket: mybucket + secret_access_key: xxx + access_key_id: xxx + endpoint_url: myendpoint.storage.com + validate_bucket_name: False + region_name: RegionA + public: False + # path is not explicitly specified, but must be passed as argument + # path: diff --git a/harvester/abc.py b/harvester/abc.py new file mode 100644 index 0000000000000000000000000000000000000000..b5612f5659d65d9f4858681e26bdbd5d2ec678bb --- /dev/null +++ b/harvester/abc.py @@ -0,0 +1,41 @@ +from abc import abstractmethod, ABC +from dataclasses import dataclass +from datetime import datetime +from typing import IO, AnyStr, Iterator, List + + +class Resource(ABC): + """ + Represents online resource such as an endpoint (API...) or data source (S3/swift...) + that provides data or metadata. + """ + + @abstractmethod + def harvest(self) -> Iterator[dict]: + ... + + +@dataclass +class Stat: + mtime: datetime + size: int + + +class Source(ABC): + @abstractmethod + def open(self, path: str) -> IO[AnyStr]: + ... + + @abstractmethod + def listdir(self, path) -> List[str]: + ... + + +class Endpoint(Resource): + def __init__(self, url: str): + self.url = url + + +class FileScheme(Resource): + def __init__(self, source: Source): + self.source = source diff --git a/harvester/endpoint/__init__.py b/harvester/endpoint/__init__.py index 142536d9e98ba052f20dfac65bd2f28417327950..3b9a147e0592db459b42a000daba64f015c03bbe 100644 --- a/harvester/endpoint/__init__.py +++ b/harvester/endpoint/__init__.py @@ -1,10 +1,10 @@ from typing import Optional +from ..abc import Endpoint from .oads import OADSEndpoint from .ogcapi import OGCAPIEndpoint from .opensearch import OpenSearchEndpoint from .stacapi import STACAPIEndpoint -from ._endpoint import Endpoint ENDPOINT_MAP = { "OADS": OADSEndpoint, @@ -15,16 +15,11 @@ ENDPOINT_MAP = { def get_endpoint(endpoint_cfg: dict) -> Optional[Endpoint]: - - cls = ENDPOINT_MAP.get(endpoint_cfg["type"]) + cls = ENDPOINT_MAP.get(endpoint_cfg.pop("type")) if not cls: return None - url = endpoint_cfg.pop("url") - query = endpoint_cfg.pop("query") - - endpoint = cls(url=url, query=query, **endpoint_cfg) - return endpoint + return cls(**endpoint_cfg) __all__ = [ diff --git a/harvester/endpoint/_endpoint.py b/harvester/endpoint/_endpoint.py deleted file mode 100644 index 51c1a76e8c65d971ec47aba945ad1562097f6c11..0000000000000000000000000000000000000000 --- a/harvester/endpoint/_endpoint.py +++ /dev/null @@ -1,8 +0,0 @@ -from ..resource import Resource - - -class Endpoint(Resource): - type: str = "Endpoint" - - def __init__(self, url: str, *args, **kwargs): - self.url = url diff --git a/harvester/endpoint/oads.py b/harvester/endpoint/oads.py index f48633c3072bbb16b335e7591d3c3b218a85acc5..18005a199c61e4631581e4dc433ce7de51450ecd 100644 --- a/harvester/endpoint/oads.py +++ b/harvester/endpoint/oads.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._endpoint import Endpoint +from ..abc import Endpoint class OADSEndpoint(Endpoint): diff --git a/harvester/endpoint/ogcapi.py b/harvester/endpoint/ogcapi.py index 7030d02f0a9e5abba29eeabba130f2bd54fd6872..f3ac8acbd7297523441cf85b51b20e17e1585d5c 100644 --- a/harvester/endpoint/ogcapi.py +++ b/harvester/endpoint/ogcapi.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._endpoint import Endpoint +from ..abc import Endpoint class OGCAPIEndpoint(Endpoint): diff --git a/harvester/endpoint/opensearch.py b/harvester/endpoint/opensearch.py index 48744a545df0bbb524b92c9a543c0af4dbda289d..bc0f7581a24328ef27f96da3105ddfe80e34f855 100644 --- a/harvester/endpoint/opensearch.py +++ b/harvester/endpoint/opensearch.py @@ -7,7 +7,7 @@ import requests import lxml.etree as ET import pystac -from ._endpoint import Endpoint +from ..abc import Endpoint from .query import Query from ..exceptions import QueryError diff --git a/harvester/endpoint/stacapi.py b/harvester/endpoint/stacapi.py index 8032b59c72f87df47de3330c86f0da1a2b1fbed1..c0af168009ab4e13257003f3ec74ce91a4f96e2e 100644 --- a/harvester/endpoint/stacapi.py +++ b/harvester/endpoint/stacapi.py @@ -3,7 +3,7 @@ from typing import Iterator import requests -from ._endpoint import Endpoint +from ..abc import Endpoint from .query import Query logger = logging.getLogger(__name__) diff --git a/harvester/filescheme/__init__.py b/harvester/filescheme/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..5fd513b9240ade499d7c69df3fe620380479bd4e --- /dev/null +++ b/harvester/filescheme/__init__.py @@ -0,0 +1,20 @@ +from typing import Optional + +from ..abc import FileScheme +from ..source import get_source +from .filematcher import FileMatcherScheme +from .stac_catalog import STACCatalogScheme + + +SCHEME_MAP = { + "FileMatcher": FileMatcherScheme, + "STACCatalog": STACCatalogScheme +} + + +def get_filescheme(filescheme_cfg: dict) -> Optional[FileScheme]: + cls = SCHEME_MAP.get(filescheme_cfg.pop("type")) + if not cls: + return None + + return cls(get_source(filescheme_cfg.pop("source")), **filescheme_cfg) diff --git a/harvester/filescheme/filematcher.py b/harvester/filescheme/filematcher.py new file mode 100644 index 0000000000000000000000000000000000000000..450ea27708814b8b93383ca060c49fcc7d91589c --- /dev/null +++ b/harvester/filescheme/filematcher.py @@ -0,0 +1,39 @@ +import logging +import re +from typing import Iterator + +from dateutil.parser import isoparse +import pystac + +from ..abc import FileScheme, Source + + +logger = logging.getLogger(__name__) + + +class FileMatcherScheme(FileScheme): + def __init__(self, source: Source, path: str, time_regex: str, url: str): + super().__init__(source) + self.path = path + self.time_regex = re.compile(time_regex) + self.url = url + + def harvest(self) -> Iterator[dict]: + logger.info("Starting S3 harvesting") + for filename in self.source.listdir(self.path): + if match := re.search(self.time_regex, filename): + dt = isoparse(match[0]) + item = self._create_item(filename, dt) + yield item.to_dict() + + def _create_item(self, path, dt): + identifier = dt.strftime("%Y%m%d_%H%M%S") + properties = { + "datetime": dt, + "updated": self.source.stat(path).mtime, + } + item = pystac.Item( + id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties + ) + item.add_asset(identifier, pystac.Asset(f"{self.url}{path}")) + return item diff --git a/harvester/filescheme/stac_catalog.py b/harvester/filescheme/stac_catalog.py new file mode 100644 index 0000000000000000000000000000000000000000..ac8f8d73ff87f370cbc973e02abf01d5353380f0 --- /dev/null +++ b/harvester/filescheme/stac_catalog.py @@ -0,0 +1,37 @@ +import logging +import json +from typing import Iterator +from urllib.parse import urljoin + +from ..abc import FileScheme, Source + + +logger = logging.getLogger(__name__) + + +class STACCatalogScheme(FileScheme): + def __init__(self, source: Source, root_path: str): + super().__init__(source) + self.root_path = root_path + + def _read_json(self, path): + return json.load(self.source.open(path)) + + def harvest(self) -> Iterator[dict]: + yield from self.harvest_catalog(self.root_path) + + def harvest_catalog(self, path: str) -> Iterator[dict]: + """ + Harvests a specified STAC catalog. Will recurse into child catalogs + and yield all included STAC items. + """ + logger.info(f"Harvesting from catalog {path}") + catalog = self._read_json(path) + for link in catalog["links"]: + if link["rel"] == "item": + item_href = urljoin(path, link["href"]) + logger.info(f"Harvested item {item_href}") + yield self._read_json(item_href) + elif link["rel"] == "child": + catalog_href = urljoin(path, link["href"]) + yield from self.harvest_catalog(catalog_href) diff --git a/harvester/harvester.py b/harvester/harvester.py index 40f35c9102bcac583bb820d9ada0a31aa98b925b..f055ddaa915588f40ec7cb92f9b2d212a6f26c93 100644 --- a/harvester/harvester.py +++ b/harvester/harvester.py @@ -5,9 +5,9 @@ from typing import Union from redis import Redis from mergedeep import merge -from .resource import Resource +from .abc import Resource from .endpoint import get_endpoint -from .source import get_source +from .filescheme import get_filescheme from .exceptions import HarvestError from .utils import cql_filter from .postprocess import get_postprocessor @@ -17,10 +17,10 @@ logger = logging.getLogger(__name__) def init_resource(harvest_config: dict) -> Resource: config: dict = harvest_config.pop("resource") - if endpoint := get_endpoint(config): + if endpoint := get_endpoint(dict(config)): return endpoint - if source := get_source(config): + if source := get_filescheme(dict(config)): return source raise HarvestError(f"Resource type {config['type']} not found") diff --git a/harvester/postprocess.py b/harvester/postprocess.py index b8590b483d5b41dcc290e0ab241d293d0df14566..4eae1a5e735d74f60d52a23180d837ca44c43587 100644 --- a/harvester/postprocess.py +++ b/harvester/postprocess.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod from typing import Dict, Type +from .utils import import_by_path + class Postprocessor(ABC): def __init__(self, **kwargs): @@ -17,5 +19,9 @@ POSTPROCESSORS: Dict[str, Type[Postprocessor]] = { def get_postprocessor(config: dict) -> Postprocessor: - cls = POSTPROCESSORS[config.pop("type")] + type_ = config.pop("type") + try: + cls = POSTPROCESSORS[type_] + except KeyError: + cls = import_by_path(type_) return cls(**config) diff --git a/harvester/resource.py b/harvester/resource.py deleted file mode 100644 index f0223030b41050cc34fcf8238bfd95077f683940..0000000000000000000000000000000000000000 --- a/harvester/resource.py +++ /dev/null @@ -1,19 +0,0 @@ -from abc import abstractmethod, ABC -from typing import Iterator - - -class Resource(ABC): - """ - Represents online resource such as an endpoint (API...) or data source (S3/swift...) - that provides data or metadata. - """ - - # All resources should implement the interface to do following - # 1. prepare harvest - # 2. harvest resource - # 3. convert to stac items - # 4. return list of stac items as dictionaries - - @abstractmethod - def harvest(self) -> Iterator[dict]: - pass diff --git a/harvester/source/__init__.py b/harvester/source/__init__.py index 3791e23af0f3ace12119c824181d08448ad512f4..e7a60d931f4ae885ce07ce17291e27ec7b6717fc 100644 --- a/harvester/source/__init__.py +++ b/harvester/source/__init__.py @@ -1,38 +1,25 @@ from typing import Optional -from .stac_catalog import STACCatalogSource +from ..abc import Source from .swift import SwiftSource -from .s3 import S3Source, S3CatalogSource +from .s3 import S3Source from .ftp import FTPSource -from ._source import Source SOURCE_MAP = { "FTP": FTPSource, "S3": S3Source, - "S3Catalog": S3CatalogSource, - "STACCatalog": STACCatalogSource, "Swift": SwiftSource, } def get_source(source_cfg: dict) -> Optional[Source]: - - cls = SOURCE_MAP.get(source_cfg["type"]) - if not cls: - return None - parameters = source_cfg["parameters"] - - source = cls(parameters) - - return source + cls = SOURCE_MAP[source_cfg.pop("type")] + return cls(**source_cfg) __all__ = [ "FTPSource", "S3Source", - "S3CatalogSource", - "STACCatalogSource", "SwiftSource", - "Source", "get_source", ] diff --git a/harvester/source/_source.py b/harvester/source/_source.py deleted file mode 100644 index 6eb19538640a0e1abffa4b2f9f386d67087b10d2..0000000000000000000000000000000000000000 --- a/harvester/source/_source.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Dict -from ..resource import Resource - - -class Source(Resource): - type: str = "Source" - - def __init__(self, parameters: Dict[str, str], *args, **kwargs): - self.parameters = parameters diff --git a/harvester/source/ftp.py b/harvester/source/ftp.py index fafed61066c36a4f2e47900e8ab4ced7d3ea4bdc..c6749a5be9e6a8712c252d8da2d10602c81c10e4 100644 --- a/harvester/source/ftp.py +++ b/harvester/source/ftp.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._source import Source +from ..abc import Source class FTPSource(Source): diff --git a/harvester/source/s3.py b/harvester/source/s3.py index 19cdc36bf7a377fe411158e9769478154d822555..bb3ecef53447e5f8eab6e89d722e65a1c66244ad 100644 --- a/harvester/source/s3.py +++ b/harvester/source/s3.py @@ -1,11 +1,8 @@ -import re import logging -from dateutil.parser import isoparse -from typing import TYPE_CHECKING, Iterator, Tuple + +from typing import TYPE_CHECKING, IO, AnyStr, List, Tuple from functools import cached_property from urllib.parse import urlparse -import json -from os.path import dirname, join if TYPE_CHECKING: from mypy_boto3_s3.client import S3Client @@ -16,28 +13,27 @@ import botocore.session import botocore.handlers from botocore import UNSIGNED from botocore.config import Config -import pystac -from ._source import Source +from ..abc import Source, Stat logger = logging.getLogger(__name__) -class S3Base: +class S3Source(Source): def __init__( self, + bucket: str, secret_access_key: str = None, access_key_id: str = None, endpoint_url: str = "", - strip_bucket: bool = True, validate_bucket_name: bool = True, region_name: str = None, public: bool = False, ): + self.bucket = bucket self.secret_access_key = secret_access_key self.access_key_id = access_key_id self.endpoint_url = endpoint_url - self.strip_bucket = strip_bucket self.region_name = region_name self.public = public self.validate_bucket_name = validate_bucket_name @@ -62,116 +58,33 @@ class S3Base: ) return client + def get_bucket_and_key(self, path: str) -> Tuple[str, str]: + parsed = urlparse(path) + if parsed.scheme and parsed.scheme.lower() != "s3": + raise ValueError(f"Invalid S3 URL {path}") -class S3Source(Source): - type = "S3" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - @property - def client(self) -> "S3Client": - botocore_session = botocore.session.Session() - session = boto3.session.Session(botocore_session=botocore_session) - - client = session.client( - "s3", - aws_access_key_id=self.parameters["access_key_id"], - aws_secret_access_key=self.parameters["secret_access_key"], - region_name=self.parameters["region"], - ) - return client - - @property - def bucket(self) -> str: - bucket = self.parameters["url"].strip("https://").split(".")[0] - return bucket + if parsed.netloc: + path = parsed.path + if path.startswith("/"): + path = path[1:] + return (parsed.netloc, path) - def harvest(self) -> Iterator[dict]: - logger.info("Starting S3 harvesting") + if path.startswith("/"): + path = path[1:] + return (self.bucket, path) + def listdir(self, path) -> List[str]: paginator = self.client.get_paginator("list_objects_v2") - pages = paginator.paginate(Bucket=self.bucket, Prefix=self.parameters["prefix"]) - - time_regex: str = self.parameters["time_regex"] - for page in pages: - for file in page["Contents"]: - if match := re.search(time_regex, file["Key"]): - dt = isoparse(match[0]) - item = self._create_item(file, dt, self.parameters["url"]) - yield item.to_dict() - - def _create_item(self, data, dt, url): - identifier = dt.strftime("%Y%m%d_%H%M%S") - properties = { - "datetime": dt, - "updated": data["LastModified"], - } - item = pystac.Item( - id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties - ) - item.add_asset(identifier, pystac.Asset(f"{url}{data['Key']}")) - return item + bucket, prefix = self.get_bucket_and_key(path) + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + return [file["Key"] for page in pages for file in page["Contents"]] -class S3CatalogSource(S3Base): - type = "S3Catalog" - - def __init__(self, parameters: dict, **kwargs): - self.root_href = parameters.pop("root_href") - self.default_catalog_name = parameters.pop("default_catalog_name", None) - super().__init__(**parameters) - - def harvest(self) -> Iterator[dict]: - logger.info("Starting S3 Catalog harvesting") - parsed = urlparse(self.root_href) - path = parsed.path - if path.startswith("/"): - path = parsed.path[1:] - if path.endswith("/") and self.default_catalog_name: - path = join(path, self.default_catalog_name) - yield from self.harvest_catalog(parsed.netloc, path) - - def fetch_json(self, bucket: str, key: str) -> dict: - """ - Loads the given object identifier by bucket and key and loads it as - JSON. - """ - if key.startswith("/"): - key = key[1:] - response = self.client.get_object(Bucket=bucket, Key=key) - return json.load(response["Body"]) - - def join_href(self, bucket: str, key: str, href: str) -> Tuple[str, str]: - """ - Joins the given href with a previous bucket/key. When we have a fully - qualified S3 URL, the included bucket/key pair is returned. - If href is a relative path, it is joined with the previous key. - """ - parsed = urlparse(href) - if parsed.netloc: - if parsed.scheme.lower() != "s3": - # TODO: what if HTTP hrefs? - raise ValueError("Can only join S3 URLs") - return (parsed.netloc, parsed.path) - else: - return ( - bucket, - join(dirname(key), parsed.path), - ) + def open(self, path: str) -> IO[AnyStr]: + bucket, key = self.get_bucket_and_key(path) + return self.client.get_object(Bucket=bucket, Key=key)["Body"] - def harvest_catalog(self, bucket: str, key: str) -> Iterator[dict]: - """ - Harvests a specified STAC catalog. Will recurse into child catalogs - and yield all included STAC items. - """ - logger.info(f"Harvesting from catalog {bucket}/{key}") - catalog = self.fetch_json(bucket, key) - for link in catalog["links"]: - if link["rel"] == "item": - item_bucket, item_key = self.join_href(bucket, key, link["href"]) - logger.info(f"Harvested item {item_bucket}/{item_key}") - yield self.fetch_json(item_bucket, item_key) - elif link["rel"] == "child": - cat_bucket, cat_key = self.join_href(bucket, key, link["href"]) - yield from self.harvest_catalog(cat_bucket, cat_key) + def stat(self, path: str) -> Stat: + bucket, key = self.get_bucket_and_key(path) + response = self.client.head_object(Bucket=bucket, Key=key) + return Stat(mtime=response["LastModified"], size=response["ContentLength"]) diff --git a/harvester/source/stac_catalog.py b/harvester/source/stac_catalog.py deleted file mode 100644 index 8701f0edf3007a0f74158b80d5946e39c3533e86..0000000000000000000000000000000000000000 --- a/harvester/source/stac_catalog.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Iterator -from ._source import Source - - -class STACCatalogSource(Source): - type = "STACCatalog" - - def harvest(self) -> Iterator[dict]: - raise NotImplementedError() diff --git a/harvester/source/swift.py b/harvester/source/swift.py index 4261d205815d7fe4ae48dbec2e30288c7ccbbef9..8461c52403fd24254f8c33c9e89579b1b9318de5 100644 --- a/harvester/source/swift.py +++ b/harvester/source/swift.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._source import Source +from ..abc import Source class SwiftSource(Source): diff --git a/harvester/utils.py b/harvester/utils.py index 194364583fbff373953b3ebbf0965c7fac43c4c6..75bab66f4aa11ca37da053cf7e06e0459484e8f1 100644 --- a/harvester/utils.py +++ b/harvester/utils.py @@ -1,4 +1,5 @@ -from typing import Iterator +import importlib +from typing import Any, Iterator from pygeofilter.backends.native.evaluate import NativeEvaluator from pygeofilter.parsers.cql_json import parse as json_parse @@ -13,3 +14,14 @@ def cql_filter(_filter: dict, data: Iterator[dict]) -> Iterator[dict]: nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False) evaluator = nat_eval.evaluate(_filter) yield from filter(evaluator, data) + + +def import_by_path(path: str) -> Any: + """Imports the object from the referenced module. + + Args: + path (str): the dotted Python path, where the last element is the + object in the referenced module. + """ + module_path, _, object_name = path.rpartition(".") + return getattr(importlib.import_module(module_path), object_name) diff --git a/tests/data/config.yml b/tests/data/config.yml index 5c33bdc8d25b32130d0945553109d9836e8b1d0f..22405d0d9a90c67df72fffa3761bff2b60bf50df 100644 --- a/tests/data/config.yml +++ b/tests/data/config.yml @@ -15,16 +15,19 @@ harvesters: extract_property: null - name: Fusion-data resource: - type: S3 - parameters: - url: https://eox-fusion.s3.amazonaws.com/ + type: FileMatcher + path: vrt/SR/ + # file_regex: \d{4}-\d{2}-\d{2}.vrt + time_regex: \d{4}-\d{2}-\d{2} + # time_format: "%Y-%m-%d" + url: https://eox-fusion.s3.amazonaws.com/ + source: + type: S3 + bucket: eox-fusion + endpoint_url: https://eox-fusion.s3.amazonaws.com/ access_key_id: AKIAXVU5ZHGDAZ3XXOM2 secret_access_key: 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 - region: eu-central-1 - prefix: vrt/SR/ - file_regex: \d{4}-\d{2}-\d{2}.vrt - time_regex: \d{4}-\d{2}-\d{2} - time_format: "%Y-%m-%d" + region_name: eu-central-1 filter: null mode: item queue: register @@ -54,7 +57,6 @@ harvesters: property_mapping: start_datetime: startDate end_datetime: completionDate - provider: testmodule.provider.SampleProvider - filter: {} + postprocessor: testmodule.provider.SampleProvider queue: register mode: item