From 6dcd00328252028dd879dea2eddb40d15cda388b Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 7 Dec 2021 10:59:36 +0100 Subject: [PATCH 1/9] Reorganizing resources. Sources are now only FS abstractions New FileSchemes operate on Sources --- harvester/abc.py | 41 +++++++++ harvester/endpoint/_endpoint.py | 8 -- harvester/endpoint/oads.py | 2 +- harvester/endpoint/ogcapi.py | 2 +- harvester/endpoint/opensearch.py | 2 +- harvester/endpoint/stacapi.py | 2 +- harvester/filescheme/__init__.py | 0 harvester/filescheme/filematcher.py | 39 ++++++++ harvester/filescheme/stac_catalog.py | 47 ++++++++++ harvester/harvester.py | 3 +- harvester/resource.py | 19 ---- harvester/source/__init__.py | 7 +- harvester/source/_source.py | 9 -- harvester/source/s3.py | 132 +++++---------------------- harvester/source/stac_catalog.py | 9 -- 15 files changed, 159 insertions(+), 163 deletions(-) create mode 100644 harvester/abc.py delete mode 100644 harvester/endpoint/_endpoint.py create mode 100644 harvester/filescheme/__init__.py create mode 100644 harvester/filescheme/filematcher.py create mode 100644 harvester/filescheme/stac_catalog.py delete mode 100644 harvester/resource.py delete mode 100644 harvester/source/_source.py delete mode 100644 harvester/source/stac_catalog.py diff --git a/harvester/abc.py b/harvester/abc.py new file mode 100644 index 0000000..b5612f5 --- /dev/null +++ b/harvester/abc.py @@ -0,0 +1,41 @@ +from abc import abstractmethod, ABC +from dataclasses import dataclass +from datetime import datetime +from typing import IO, AnyStr, Iterator, List + + +class Resource(ABC): + """ + Represents online resource such as an endpoint (API...) or data source (S3/swift...) + that provides data or metadata. + """ + + @abstractmethod + def harvest(self) -> Iterator[dict]: + ... + + +@dataclass +class Stat: + mtime: datetime + size: int + + +class Source(ABC): + @abstractmethod + def open(self, path: str) -> IO[AnyStr]: + ... + + @abstractmethod + def listdir(self, path) -> List[str]: + ... + + +class Endpoint(Resource): + def __init__(self, url: str): + self.url = url + + +class FileScheme(Resource): + def __init__(self, source: Source): + self.source = source diff --git a/harvester/endpoint/_endpoint.py b/harvester/endpoint/_endpoint.py deleted file mode 100644 index 51c1a76..0000000 --- a/harvester/endpoint/_endpoint.py +++ /dev/null @@ -1,8 +0,0 @@ -from ..resource import Resource - - -class Endpoint(Resource): - type: str = "Endpoint" - - def __init__(self, url: str, *args, **kwargs): - self.url = url diff --git a/harvester/endpoint/oads.py b/harvester/endpoint/oads.py index f48633c..18005a1 100644 --- a/harvester/endpoint/oads.py +++ b/harvester/endpoint/oads.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._endpoint import Endpoint +from ..abc import Endpoint class OADSEndpoint(Endpoint): diff --git a/harvester/endpoint/ogcapi.py b/harvester/endpoint/ogcapi.py index 7030d02..f3ac8ac 100644 --- a/harvester/endpoint/ogcapi.py +++ b/harvester/endpoint/ogcapi.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._endpoint import Endpoint +from ..abc import Endpoint class OGCAPIEndpoint(Endpoint): diff --git a/harvester/endpoint/opensearch.py b/harvester/endpoint/opensearch.py index 48744a5..bc0f758 100644 --- a/harvester/endpoint/opensearch.py +++ b/harvester/endpoint/opensearch.py @@ -7,7 +7,7 @@ import requests import lxml.etree as ET import pystac -from ._endpoint import Endpoint +from ..abc import Endpoint from .query import Query from ..exceptions import QueryError diff --git a/harvester/endpoint/stacapi.py b/harvester/endpoint/stacapi.py index 8032b59..c0af168 100644 --- a/harvester/endpoint/stacapi.py +++ b/harvester/endpoint/stacapi.py @@ -3,7 +3,7 @@ from typing import Iterator import requests -from ._endpoint import Endpoint +from ..abc import Endpoint from .query import Query logger = logging.getLogger(__name__) diff --git a/harvester/filescheme/__init__.py b/harvester/filescheme/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/harvester/filescheme/filematcher.py b/harvester/filescheme/filematcher.py new file mode 100644 index 0000000..450ea27 --- /dev/null +++ b/harvester/filescheme/filematcher.py @@ -0,0 +1,39 @@ +import logging +import re +from typing import Iterator + +from dateutil.parser import isoparse +import pystac + +from ..abc import FileScheme, Source + + +logger = logging.getLogger(__name__) + + +class FileMatcherScheme(FileScheme): + def __init__(self, source: Source, path: str, time_regex: str, url: str): + super().__init__(source) + self.path = path + self.time_regex = re.compile(time_regex) + self.url = url + + def harvest(self) -> Iterator[dict]: + logger.info("Starting S3 harvesting") + for filename in self.source.listdir(self.path): + if match := re.search(self.time_regex, filename): + dt = isoparse(match[0]) + item = self._create_item(filename, dt) + yield item.to_dict() + + def _create_item(self, path, dt): + identifier = dt.strftime("%Y%m%d_%H%M%S") + properties = { + "datetime": dt, + "updated": self.source.stat(path).mtime, + } + item = pystac.Item( + id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties + ) + item.add_asset(identifier, pystac.Asset(f"{self.url}{path}")) + return item diff --git a/harvester/filescheme/stac_catalog.py b/harvester/filescheme/stac_catalog.py new file mode 100644 index 0000000..ac2498b --- /dev/null +++ b/harvester/filescheme/stac_catalog.py @@ -0,0 +1,47 @@ +import logging +import json +from os.path import join +from typing import Iterator +from urllib.parse import urlparse, urljoin + +from ..abc import FileScheme, Source + + +logger = logging.getLogger(__name__) + + +class STACCatalogScheme(FileScheme): + def __init__(self, source: Source, root_path: str): + super().__init__(source) + self.root_path = root_path + + def _read_json(self, path): + return json.load(self.source.open(path)) + + def _join_href(self, path: str, href: str) -> str: + """ + Joins the given href with a previous bucket/key. When we have a fully + qualified S3 URL, the included bucket/key pair is returned. + If href is a relative path, it is joined with the previous key. + """ + parsed = urlparse(href) + return urljoin(parsed._replace(path=join(path, parsed.path))) + + def harvest(self) -> Iterator[dict]: + yield from self.harvest_catalog(self.root_path) + + def harvest_catalog(self, path: str) -> Iterator[dict]: + """ + Harvests a specified STAC catalog. Will recurse into child catalogs + and yield all included STAC items. + """ + logger.info(f"Harvesting from catalog {path}") + catalog = self._read_json(path) + for link in catalog["links"]: + if link["rel"] == "item": + item_href = self._join_href(path, link["href"]) + logger.info(f"Harvested item {item_href}") + yield self._read_json(item_href) + elif link["rel"] == "child": + catalog_href = self._join_href(path, link["href"]) + yield from self.harvest_catalog(catalog_href) diff --git a/harvester/harvester.py b/harvester/harvester.py index 83ce9c1..e2d1da5 100644 --- a/harvester/harvester.py +++ b/harvester/harvester.py @@ -4,8 +4,7 @@ from typing import Iterator, Optional from redis import Redis -from harvester.resource import Resource - +from .abc import Resource from .endpoint import get_endpoint from .source import get_source from .exceptions import HarvestError diff --git a/harvester/resource.py b/harvester/resource.py deleted file mode 100644 index f022303..0000000 --- a/harvester/resource.py +++ /dev/null @@ -1,19 +0,0 @@ -from abc import abstractmethod, ABC -from typing import Iterator - - -class Resource(ABC): - """ - Represents online resource such as an endpoint (API...) or data source (S3/swift...) - that provides data or metadata. - """ - - # All resources should implement the interface to do following - # 1. prepare harvest - # 2. harvest resource - # 3. convert to stac items - # 4. return list of stac items as dictionaries - - @abstractmethod - def harvest(self) -> Iterator[dict]: - pass diff --git a/harvester/source/__init__.py b/harvester/source/__init__.py index 3791e23..0767a0d 100644 --- a/harvester/source/__init__.py +++ b/harvester/source/__init__.py @@ -1,16 +1,13 @@ from typing import Optional -from .stac_catalog import STACCatalogSource +from ..abc import Source from .swift import SwiftSource -from .s3 import S3Source, S3CatalogSource +from .s3 import S3Source from .ftp import FTPSource -from ._source import Source SOURCE_MAP = { "FTP": FTPSource, "S3": S3Source, - "S3Catalog": S3CatalogSource, - "STACCatalog": STACCatalogSource, "Swift": SwiftSource, } diff --git a/harvester/source/_source.py b/harvester/source/_source.py deleted file mode 100644 index 6eb1953..0000000 --- a/harvester/source/_source.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Dict -from ..resource import Resource - - -class Source(Resource): - type: str = "Source" - - def __init__(self, parameters: Dict[str, str], *args, **kwargs): - self.parameters = parameters diff --git a/harvester/source/s3.py b/harvester/source/s3.py index 943ddfd..b636151 100644 --- a/harvester/source/s3.py +++ b/harvester/source/s3.py @@ -1,11 +1,8 @@ -import re import logging -from dateutil.parser import isoparse -from typing import TYPE_CHECKING, Iterator, Tuple + +from typing import TYPE_CHECKING, IO, AnyStr, List from functools import cached_property from urllib.parse import urlparse -import json -from os.path import dirname, join if TYPE_CHECKING: from mypy_boto3_s3.client import S3Client @@ -16,16 +13,16 @@ import botocore.session import botocore.handlers from botocore import UNSIGNED from botocore.config import Config -import pystac -from ._source import Source +from ..abc import Source, Stat logger = logging.getLogger(__name__) -class S3Base: +class S3Source(Source): def __init__( self, + bucket: str, secret_access_key: str = None, access_key_id: str = None, endpoint_url: str = "", @@ -34,6 +31,7 @@ class S3Base: region_name: str = None, public: bool = False, ): + self.bucket = bucket self.secret_access_key = secret_access_key self.access_key_id = access_key_id self.endpoint_url = endpoint_url @@ -62,108 +60,28 @@ class S3Base: ) return client + def get_bucket_and_key(self, path): + parsed = urlparse(path) + if parsed.scheme and parsed.scheme.lower() != "s3": + raise ValueError(f"Invalid S3 URL {path}") -class S3Source(Source): - type = "S3" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - @property - def client(self) -> "S3Client": - botocore_session = botocore.session.Session() - session = boto3.session.Session(botocore_session=botocore_session) + if parsed.netloc: + return (parsed.netloc, parsed.path) - client = session.client( - "s3", - aws_access_key_id=self.parameters["access_key_id"], - aws_secret_access_key=self.parameters["secret_access_key"], - region_name=self.parameters["region"], - ) - return client + return (self.bucket, path) - @property - def bucket(self) -> str: - bucket = self.parameters["url"].strip("https://").split(".")[0] - return bucket + def listdir(self, path) -> List[str]: + paginator = self.client.get_paginator("list_objects_v2") + bucket, prefix = self.get_bucket_and_key(path) + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) - def harvest(self) -> Iterator[dict]: - logger.info("Starting S3 harvesting") + return [file["Key"] for page in pages for file in page["Contents"]] - paginator = self.client.get_paginator("list_objects_v2") - pages = paginator.paginate(Bucket=self.bucket, Prefix=self.parameters["prefix"]) - - time_regex: str = self.parameters["time_regex"] - for page in pages: - for file in page["Contents"]: - if match := re.search(time_regex, file["Key"]): - dt = isoparse(match[0]) - item = self._create_item(file, dt, self.parameters["url"]) - yield item.to_dict() - - def _create_item(self, data, dt, url): - identifier = dt.strftime("%Y%m%d_%H%M%S") - properties = { - "datetime": dt, - "updated": data["LastModified"], - } - item = pystac.Item( - id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties - ) - item.add_asset(identifier, pystac.Asset(f"{url}{data['Key']}")) - return item - - -class S3CatalogSource(S3Base, Source): - type = "S3Catalog" - - def __init__(self, root_href: str, **kwargs): - super().__init__(**kwargs) - self.root_href = root_href - - def harvest(self) -> Iterator[dict]: - logger.info("Starting S3 Catalog harvesting") - parsed = urlparse(self.root_href) - yield from self.harvest_catalog(parsed.netloc, parsed.path) - - def fetch_json(self, bucket: str, key: str) -> dict: - """ - Loads the given object identifier by bucket and key and loads it as - JSON. - """ - response = self.client.get_object(Bucket=bucket, Key=key) - return json.load(response["Body"]) - - def join_href(self, bucket: str, key: str, href: str) -> Tuple[str, str]: - """ - Joins the given href with a previous bucket/key. When we have a fully - qualified S3 URL, the included bucket/key pair is returned. - If href is a relative path, it is joined with the previous key. - """ - parsed = urlparse(href) - if parsed.netloc: - if parsed.scheme.lower() != "s3": - # TODO: what if HTTP hrefs? - raise ValueError("Can only join S3 URLs") - return (parsed.netloc, parsed.path) - else: - return ( - bucket, - join(dirname(key), parsed.path), - ) + def open(self, path: str) -> IO[AnyStr]: + bucket, key = self.get_bucket_and_key(path) + return self.client.get_object(Bucket=bucket, Key=key)["Body"] - def harvest_catalog(self, bucket: str, key: str) -> Iterator[dict]: - """ - Harvests a specified STAC catalog. Will recurse into child catalogs - and yield all included STAC items. - """ - logger.info(f"Harvesting from catalog {bucket}/{key}") - catalog = self.fetch_json(bucket, key) - for link in catalog["links"]: - if link["rel"] == "item": - item_bucket, item_key = self.join_href(bucket, key, link["href"]) - logger.info(f"Harvested item {item_bucket}/{item_key}") - yield self.fetch_json(item_bucket, item_key) - elif link["rel"] == "child": - cat_bucket, cat_key = self.join_href(bucket, key, link["href"]) - yield from self.harvest_catalog(cat_bucket, cat_key) + def stat(self, path: str) -> Stat: + bucket, key = self.get_bucket_and_key(path) + response = self.client.head_object(Bucket=bucket, Key=key) + return Stat(mtime=response["LastModified"], size=response["ContentLength"]) diff --git a/harvester/source/stac_catalog.py b/harvester/source/stac_catalog.py deleted file mode 100644 index 8701f0e..0000000 --- a/harvester/source/stac_catalog.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Iterator -from ._source import Source - - -class STACCatalogSource(Source): - type = "STACCatalog" - - def harvest(self) -> Iterator[dict]: - raise NotImplementedError() -- GitLab From 103012c9424aa150209e2a40181dc160201e1260 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 7 Dec 2021 12:06:57 +0100 Subject: [PATCH 2/9] Fixing ABCs for sources Adding filescheme setup Adding config example --- config-sample.yaml | 14 ++++++++++++++ harvester/filescheme/__init__.py | 23 +++++++++++++++++++++++ harvester/harvester.py | 4 ++-- harvester/source/__init__.py | 11 ++--------- harvester/source/ftp.py | 2 +- harvester/source/s3.py | 2 -- harvester/source/swift.py | 2 +- 7 files changed, 43 insertions(+), 15 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index b3261ab..b54ef38 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -17,3 +17,17 @@ harvesters: - - P5D - !now queue: register # register, ingest, delete, update, preprocess + + - name: MyS3STACCatalogHarvester + type: STACCatalog + source: + type: S3 + bucket: mybucket + secret_access_key: xxx + access_key_id: xxx + endpoint_url: myendpoint.storage.com + validate_bucket_name: False + region_name: RegionA + public: False + # path is not explicitly specified, but must be passed as argument + # path: diff --git a/harvester/filescheme/__init__.py b/harvester/filescheme/__init__.py index e69de29..183f440 100644 --- a/harvester/filescheme/__init__.py +++ b/harvester/filescheme/__init__.py @@ -0,0 +1,23 @@ +from typing import Optional + +from ..abc import FileScheme +from ..source import get_source +from .filematcher import FileMatcherScheme +from .stac_catalog import STACCatalogScheme + + +SCHEME_MAP = { + "FileMatcher": FileMatcherScheme, + "STACCatalog": STACCatalogScheme +} + + +def get_filescheme(filescheme_cfg: dict) -> Optional[FileScheme]: + cls = SCHEME_MAP.get(filescheme_cfg["type"]) + if not cls: + return None + + parameters = filescheme_cfg["parameters"] + filescheme = cls(get_source(filescheme_cfg["source"]), **parameters) + + return filescheme diff --git a/harvester/harvester.py b/harvester/harvester.py index e2d1da5..6ab1247 100644 --- a/harvester/harvester.py +++ b/harvester/harvester.py @@ -6,7 +6,7 @@ from redis import Redis from .abc import Resource from .endpoint import get_endpoint -from .source import get_source +from .filescheme import get_filescheme from .exceptions import HarvestError from .utils import cql_filter @@ -30,7 +30,7 @@ def init_resource(harvest_config: dict) -> Resource: if endpoint := get_endpoint(config): return endpoint - if source := get_source(config): + if source := get_filescheme(config): return source raise HarvestError(f"Resource type {config['type']} not found") diff --git a/harvester/source/__init__.py b/harvester/source/__init__.py index 0767a0d..f6a0373 100644 --- a/harvester/source/__init__.py +++ b/harvester/source/__init__.py @@ -13,23 +13,16 @@ SOURCE_MAP = { def get_source(source_cfg: dict) -> Optional[Source]: - - cls = SOURCE_MAP.get(source_cfg["type"]) + cls = SOURCE_MAP.get(source_cfg.pop("type")) if not cls: return None - parameters = source_cfg["parameters"] - - source = cls(parameters) - + source = cls(**source_cfg) return source __all__ = [ "FTPSource", "S3Source", - "S3CatalogSource", - "STACCatalogSource", "SwiftSource", - "Source", "get_source", ] diff --git a/harvester/source/ftp.py b/harvester/source/ftp.py index fafed61..c6749a5 100644 --- a/harvester/source/ftp.py +++ b/harvester/source/ftp.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._source import Source +from ..abc import Source class FTPSource(Source): diff --git a/harvester/source/s3.py b/harvester/source/s3.py index b636151..e9df763 100644 --- a/harvester/source/s3.py +++ b/harvester/source/s3.py @@ -26,7 +26,6 @@ class S3Source(Source): secret_access_key: str = None, access_key_id: str = None, endpoint_url: str = "", - strip_bucket: bool = True, validate_bucket_name: bool = True, region_name: str = None, public: bool = False, @@ -35,7 +34,6 @@ class S3Source(Source): self.secret_access_key = secret_access_key self.access_key_id = access_key_id self.endpoint_url = endpoint_url - self.strip_bucket = strip_bucket self.region_name = region_name self.public = public self.validate_bucket_name = validate_bucket_name diff --git a/harvester/source/swift.py b/harvester/source/swift.py index 4261d20..8461c52 100644 --- a/harvester/source/swift.py +++ b/harvester/source/swift.py @@ -1,5 +1,5 @@ from typing import Iterator -from ._source import Source +from ..abc import Source class SwiftSource(Source): -- GitLab From 6f2e908d62b36d43dcdd78bd23a8cf2fac211ba3 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 7 Dec 2021 12:10:41 +0100 Subject: [PATCH 3/9] Using urljoin to simplify --- harvester/filescheme/stac_catalog.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/harvester/filescheme/stac_catalog.py b/harvester/filescheme/stac_catalog.py index ac2498b..ac8f8d7 100644 --- a/harvester/filescheme/stac_catalog.py +++ b/harvester/filescheme/stac_catalog.py @@ -1,8 +1,7 @@ import logging import json -from os.path import join from typing import Iterator -from urllib.parse import urlparse, urljoin +from urllib.parse import urljoin from ..abc import FileScheme, Source @@ -18,15 +17,6 @@ class STACCatalogScheme(FileScheme): def _read_json(self, path): return json.load(self.source.open(path)) - def _join_href(self, path: str, href: str) -> str: - """ - Joins the given href with a previous bucket/key. When we have a fully - qualified S3 URL, the included bucket/key pair is returned. - If href is a relative path, it is joined with the previous key. - """ - parsed = urlparse(href) - return urljoin(parsed._replace(path=join(path, parsed.path))) - def harvest(self) -> Iterator[dict]: yield from self.harvest_catalog(self.root_path) @@ -39,9 +29,9 @@ class STACCatalogScheme(FileScheme): catalog = self._read_json(path) for link in catalog["links"]: if link["rel"] == "item": - item_href = self._join_href(path, link["href"]) + item_href = urljoin(path, link["href"]) logger.info(f"Harvested item {item_href}") yield self._read_json(item_href) elif link["rel"] == "child": - catalog_href = self._join_href(path, link["href"]) + catalog_href = urljoin(path, link["href"]) yield from self.harvest_catalog(catalog_href) -- GitLab From 51425b69633de74c34381373c908ac69d43edacc Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 7 Dec 2021 12:13:06 +0100 Subject: [PATCH 4/9] Fixing path of ABC --- harvester/endpoint/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/harvester/endpoint/__init__.py b/harvester/endpoint/__init__.py index 142536d..cdc01ff 100644 --- a/harvester/endpoint/__init__.py +++ b/harvester/endpoint/__init__.py @@ -1,10 +1,10 @@ from typing import Optional +from ..abc import Endpoint from .oads import OADSEndpoint from .ogcapi import OGCAPIEndpoint from .opensearch import OpenSearchEndpoint from .stacapi import STACAPIEndpoint -from ._endpoint import Endpoint ENDPOINT_MAP = { "OADS": OADSEndpoint, -- GitLab From dfa3cdd8d8ab1564ccc74d729fe6b4d502b4230b Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 7 Dec 2021 18:35:29 +0100 Subject: [PATCH 5/9] Fixing handling of absolute paths in S3 source --- harvester/source/s3.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/harvester/source/s3.py b/harvester/source/s3.py index e9df763..bb3ecef 100644 --- a/harvester/source/s3.py +++ b/harvester/source/s3.py @@ -1,6 +1,6 @@ import logging -from typing import TYPE_CHECKING, IO, AnyStr, List +from typing import TYPE_CHECKING, IO, AnyStr, List, Tuple from functools import cached_property from urllib.parse import urlparse @@ -58,14 +58,19 @@ class S3Source(Source): ) return client - def get_bucket_and_key(self, path): + def get_bucket_and_key(self, path: str) -> Tuple[str, str]: parsed = urlparse(path) if parsed.scheme and parsed.scheme.lower() != "s3": raise ValueError(f"Invalid S3 URL {path}") if parsed.netloc: - return (parsed.netloc, parsed.path) + path = parsed.path + if path.startswith("/"): + path = path[1:] + return (parsed.netloc, path) + if path.startswith("/"): + path = path[1:] return (self.bucket, path) def listdir(self, path) -> List[str]: -- GitLab From aa7054d465ef87eace9e60a9019fbd0a399aa2ca Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Thu, 9 Dec 2021 15:43:46 +0100 Subject: [PATCH 6/9] Fixing interfaces --- harvester/endpoint/__init__.py | 9 ++------- harvester/filescheme/__init__.py | 5 ++--- harvester/harvester.py | 4 ++-- harvester/source/__init__.py | 7 ++----- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/harvester/endpoint/__init__.py b/harvester/endpoint/__init__.py index cdc01ff..3b9a147 100644 --- a/harvester/endpoint/__init__.py +++ b/harvester/endpoint/__init__.py @@ -15,16 +15,11 @@ ENDPOINT_MAP = { def get_endpoint(endpoint_cfg: dict) -> Optional[Endpoint]: - - cls = ENDPOINT_MAP.get(endpoint_cfg["type"]) + cls = ENDPOINT_MAP.get(endpoint_cfg.pop("type")) if not cls: return None - url = endpoint_cfg.pop("url") - query = endpoint_cfg.pop("query") - - endpoint = cls(url=url, query=query, **endpoint_cfg) - return endpoint + return cls(**endpoint_cfg) __all__ = [ diff --git a/harvester/filescheme/__init__.py b/harvester/filescheme/__init__.py index 183f440..b6b152d 100644 --- a/harvester/filescheme/__init__.py +++ b/harvester/filescheme/__init__.py @@ -13,11 +13,10 @@ SCHEME_MAP = { def get_filescheme(filescheme_cfg: dict) -> Optional[FileScheme]: - cls = SCHEME_MAP.get(filescheme_cfg["type"]) + cls = SCHEME_MAP.get(filescheme_cfg.pop("type")) if not cls: return None - parameters = filescheme_cfg["parameters"] - filescheme = cls(get_source(filescheme_cfg["source"]), **parameters) + filescheme = cls(get_source(filescheme_cfg["source"]), **filescheme_cfg) return filescheme diff --git a/harvester/harvester.py b/harvester/harvester.py index c577b1c..815788d 100644 --- a/harvester/harvester.py +++ b/harvester/harvester.py @@ -27,10 +27,10 @@ def stringify( def init_resource(harvest_config: dict) -> Resource: config: dict = harvest_config.pop("resource") - if endpoint := get_endpoint(config): + if endpoint := get_endpoint(dict(config)): return endpoint - if source := get_filescheme(config): + if source := get_filescheme(dict(config)): return source raise HarvestError(f"Resource type {config['type']} not found") diff --git a/harvester/source/__init__.py b/harvester/source/__init__.py index f6a0373..e7a60d9 100644 --- a/harvester/source/__init__.py +++ b/harvester/source/__init__.py @@ -13,11 +13,8 @@ SOURCE_MAP = { def get_source(source_cfg: dict) -> Optional[Source]: - cls = SOURCE_MAP.get(source_cfg.pop("type")) - if not cls: - return None - source = cls(**source_cfg) - return source + cls = SOURCE_MAP[source_cfg.pop("type")] + return cls(**source_cfg) __all__ = [ -- GitLab From 74f9e83c1e1f9bd51437ff0008d6e29c7a1e6c54 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Thu, 9 Dec 2021 17:15:24 +0100 Subject: [PATCH 7/9] Merge branch main --- harvester/cli.py | 9 +++++- harvester/daemon.py | 10 +++++-- harvester/harvester.py | 65 +++++++++++++++++++++++------------------- requirements.txt | 1 + tests/test_main.py | 29 ++++++++++++++++++- 5 files changed, 80 insertions(+), 34 deletions(-) diff --git a/harvester/cli.py b/harvester/cli.py index bf89dd6..ede3b0e 100644 --- a/harvester/cli.py +++ b/harvester/cli.py @@ -6,6 +6,7 @@ Contains command line interface """ import logging.config +import json import click @@ -65,6 +66,7 @@ def daemon( @cli.command(help="Run a single, one-off harvest") @click.argument("harvester_name", type=str) +@click.option("--values", type=str, default=None) @click.option("--host", type=str, required=True) @click.option("--port", type=int, required=True) @click.option("--config-file", type=click.File("r"), required=True) @@ -72,6 +74,7 @@ def daemon( @click.option("--debug/--no-debug", default=False) def harvest( harvester_name: str, + values: str, host: str, port: int, config_file: str, @@ -84,7 +87,11 @@ def harvest( validate_config(config) client = init_client(host, port) - main(config, harvester_name, client) + item = {"name": harvester_name} + if values: + item["values"] = json.loads(values) + + main(config, item, client) if __name__ == "__main__": diff --git a/harvester/daemon.py b/harvester/daemon.py index 67bb115..69e2f34 100644 --- a/harvester/daemon.py +++ b/harvester/daemon.py @@ -7,6 +7,8 @@ Contains functionality related to running the daemon """ import logging +import json + from redis import Redis from .harvester import main @@ -28,6 +30,10 @@ def run_daemon(config: dict, client: Redis, listen_queue: str): while True: # fetch an item from the queue to be harvested - _, value = client.brpop(listen_queue) + _, raw_item = client.brpop(listen_queue) + + # parse the item + item = json.loads(raw_item) + # start the harvesting - main(config, value, client) + main(config, item, client) diff --git a/harvester/harvester.py b/harvester/harvester.py index 815788d..f055dda 100644 --- a/harvester/harvester.py +++ b/harvester/harvester.py @@ -1,8 +1,9 @@ import json import logging -from typing import Iterator, Optional +from typing import Union from redis import Redis +from mergedeep import merge from .abc import Resource from .endpoint import get_endpoint @@ -13,17 +14,6 @@ from .postprocess import get_postprocessor logger = logging.getLogger(__name__) -def stringify( - result: Iterator[dict], - mode: str = "item", - extract_property: Optional[str] = None, -) -> Iterator[str]: - if mode == "item": - yield from (json.dumps(item, default=str) for item in result) - elif mode == "property": - yield from (item["properties"][extract_property] for item in result) - - def init_resource(harvest_config: dict) -> Resource: config: dict = harvest_config.pop("resource") @@ -36,16 +26,37 @@ def init_resource(harvest_config: dict) -> Resource: raise HarvestError(f"Resource type {config['type']} not found") -def main(config: dict, value: str, client: Redis): - logger.info("running harvesting for %s", value) +def get_harvest_config(config: dict, name: str, values: dict) -> dict: + """ + Selects the harvester config for the given name. + Harvesters configuration can either be a list of dicts (with name property) + or a dict mapping name to harvester. + """ + harvesters = config["harvesters"] + if isinstance(harvesters, dict): + return harvesters[name] + else: + for harvest_config in harvesters: + if harvest_config["name"] == name: + return merge({}, harvest_config, values) + else: + raise KeyError(name) + + +def main(config: dict, item: Union[dict, str], client: Redis): + if isinstance(item, str): + name = item + values = {} + else: + name = item["name"] + values = item.get("values", {}) + + logger.info(f"Running harvesting for {name}") + + # get the harvest config + harvest_config = get_harvest_config(config, name, values) # Initialize resource - try: - harvest_config = next( - item for item in config["harvesters"] if item["name"] == value - ) - except StopIteration: - harvest_config = json.loads(value) resource = init_resource(harvest_config) # Perform harvest @@ -59,15 +70,9 @@ def main(config: dict, value: str, client: Redis): ) # Filter data - result = cql_filter(harvest_config["filter"], result) - - # Stringify items - encoded = stringify( - result, - harvest_config.get("mode", "item"), - harvest_config.get("extract_property"), - ) + if "filter" in harvest_config: + result = cql_filter(harvest_config["filter"], result) # Send to queue - for value in encoded: - client.lpush(harvest_config["queue"], value) + for item in result: + client.lpush(harvest_config["queue"], json.dumps(item, default=str)) diff --git a/requirements.txt b/requirements.txt index f1642bb..c25a3c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ click<9 requests<3 lxml<5 python-dateutil<3 +mergedeep diff --git a/tests/test_main.py b/tests/test_main.py index c55b01c..65dde95 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -3,7 +3,7 @@ import requests_mock import pytest_mock import unittest.mock -from harvester.harvester import main +from harvester.harvester import main, get_harvest_config @pytest.mark.parametrize("value", [("S2L2A_Element84")]) @@ -53,3 +53,30 @@ def test_s3( main(config, value, client) client.lpush.assert_called() + + +def test_get_harvester_config(): + # testing list style + test_config = { + "harvesters": [ + { + "name": "myname", + "type": "TestType", + "properties": {} + } + ] + } + config = get_harvest_config(test_config, "myname", {}) + assert config == test_config["harvesters"][0] + + # testing list style + test_config = { + "harvesters": { + "myname": { + "type": "TestType", + "properties": {} + } + } + } + config = get_harvest_config(test_config, "myname", {}) + assert config == test_config["harvesters"]["myname"] -- GitLab From 0e73be9b85402538414cf5a4c470f6d04d02c6af Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Thu, 9 Dec 2021 17:37:15 +0100 Subject: [PATCH 8/9] Fixing test config Fixing filescheme --- harvester/filescheme/__init__.py | 4 +--- tests/data/config.yml | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/harvester/filescheme/__init__.py b/harvester/filescheme/__init__.py index b6b152d..5fd513b 100644 --- a/harvester/filescheme/__init__.py +++ b/harvester/filescheme/__init__.py @@ -17,6 +17,4 @@ def get_filescheme(filescheme_cfg: dict) -> Optional[FileScheme]: if not cls: return None - filescheme = cls(get_source(filescheme_cfg["source"]), **filescheme_cfg) - - return filescheme + return cls(get_source(filescheme_cfg.pop("source")), **filescheme_cfg) diff --git a/tests/data/config.yml b/tests/data/config.yml index 5c33bdc..22405d0 100644 --- a/tests/data/config.yml +++ b/tests/data/config.yml @@ -15,16 +15,19 @@ harvesters: extract_property: null - name: Fusion-data resource: - type: S3 - parameters: - url: https://eox-fusion.s3.amazonaws.com/ + type: FileMatcher + path: vrt/SR/ + # file_regex: \d{4}-\d{2}-\d{2}.vrt + time_regex: \d{4}-\d{2}-\d{2} + # time_format: "%Y-%m-%d" + url: https://eox-fusion.s3.amazonaws.com/ + source: + type: S3 + bucket: eox-fusion + endpoint_url: https://eox-fusion.s3.amazonaws.com/ access_key_id: AKIAXVU5ZHGDAZ3XXOM2 secret_access_key: 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 - region: eu-central-1 - prefix: vrt/SR/ - file_regex: \d{4}-\d{2}-\d{2}.vrt - time_regex: \d{4}-\d{2}-\d{2} - time_format: "%Y-%m-%d" + region_name: eu-central-1 filter: null mode: item queue: register @@ -54,7 +57,6 @@ harvesters: property_mapping: start_datetime: startDate end_datetime: completionDate - provider: testmodule.provider.SampleProvider - filter: {} + postprocessor: testmodule.provider.SampleProvider queue: register mode: item -- GitLab From 1be1be8a4d64e119311759fa6678e05156ad3d1c Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Fri, 10 Dec 2021 09:49:06 +0100 Subject: [PATCH 9/9] Allowing to declare postprocessor by python path --- harvester/postprocess.py | 8 +++++++- harvester/utils.py | 14 +++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/harvester/postprocess.py b/harvester/postprocess.py index b8590b4..4eae1a5 100644 --- a/harvester/postprocess.py +++ b/harvester/postprocess.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod from typing import Dict, Type +from .utils import import_by_path + class Postprocessor(ABC): def __init__(self, **kwargs): @@ -17,5 +19,9 @@ POSTPROCESSORS: Dict[str, Type[Postprocessor]] = { def get_postprocessor(config: dict) -> Postprocessor: - cls = POSTPROCESSORS[config.pop("type")] + type_ = config.pop("type") + try: + cls = POSTPROCESSORS[type_] + except KeyError: + cls = import_by_path(type_) return cls(**config) diff --git a/harvester/utils.py b/harvester/utils.py index 1943645..75bab66 100644 --- a/harvester/utils.py +++ b/harvester/utils.py @@ -1,4 +1,5 @@ -from typing import Iterator +import importlib +from typing import Any, Iterator from pygeofilter.backends.native.evaluate import NativeEvaluator from pygeofilter.parsers.cql_json import parse as json_parse @@ -13,3 +14,14 @@ def cql_filter(_filter: dict, data: Iterator[dict]) -> Iterator[dict]: nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False) evaluator = nat_eval.evaluate(_filter) yield from filter(evaluator, data) + + +def import_by_path(path: str) -> Any: + """Imports the object from the referenced module. + + Args: + path (str): the dotted Python path, where the last element is the + object in the referenced module. + """ + module_path, _, object_name = path.rpartition(".") + return getattr(importlib.import_module(module_path), object_name) -- GitLab