EOX GitLab Instance

Commit 94301f9a authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Merge branch 'restructure-resources' into 'main'

Reorganizing Resources

See merge request !4
parents 96b78763 d4f7030e
Pipeline #19538 passed with stages
in 1 minute and 59 seconds
......@@ -17,3 +17,17 @@ harvesters:
- - P5D
- !now
queue: register # register, ingest, delete, update, preprocess
- name: MyS3STACCatalogHarvester
type: STACCatalog
source:
type: S3
bucket: mybucket
secret_access_key: xxx
access_key_id: xxx
endpoint_url: myendpoint.storage.com
validate_bucket_name: False
region_name: RegionA
public: False
# path is not explicitly specified, but must be passed as argument
# path:
from abc import abstractmethod, ABC
from typing import Iterator
from dataclasses import dataclass
from datetime import datetime
from typing import IO, AnyStr, Iterator, List
class Resource(ABC):
......@@ -8,12 +10,32 @@ class Resource(ABC):
that provides data or metadata.
"""
# All resources should implement the interface to do following
# 1. prepare harvest
# 2. harvest resource
# 3. convert to stac items
# 4. return list of stac items as dictionaries
@abstractmethod
def harvest(self) -> Iterator[dict]:
pass
...
@dataclass
class Stat:
mtime: datetime
size: int
class Source(ABC):
@abstractmethod
def open(self, path: str) -> IO[AnyStr]:
...
@abstractmethod
def listdir(self, path) -> List[str]:
...
class Endpoint(Resource):
def __init__(self, url: str):
self.url = url
class FileScheme(Resource):
def __init__(self, source: Source):
self.source = source
from typing import Optional
from ..abc import Endpoint
from .oads import OADSEndpoint
from .ogcapi import OGCAPIEndpoint
from .opensearch import OpenSearchEndpoint
from .stacapi import STACAPIEndpoint
from ._endpoint import Endpoint
ENDPOINT_MAP = {
"OADS": OADSEndpoint,
......@@ -15,16 +15,11 @@ ENDPOINT_MAP = {
def get_endpoint(endpoint_cfg: dict) -> Optional[Endpoint]:
cls = ENDPOINT_MAP.get(endpoint_cfg["type"])
cls = ENDPOINT_MAP.get(endpoint_cfg.pop("type"))
if not cls:
return None
url = endpoint_cfg.pop("url")
query = endpoint_cfg.pop("query")
endpoint = cls(url=url, query=query, **endpoint_cfg)
return endpoint
return cls(**endpoint_cfg)
__all__ = [
......
from ..resource import Resource
class Endpoint(Resource):
type: str = "Endpoint"
def __init__(self, url: str, *args, **kwargs):
self.url = url
from typing import Iterator
from ._endpoint import Endpoint
from ..abc import Endpoint
class OADSEndpoint(Endpoint):
......
from typing import Iterator
from ._endpoint import Endpoint
from ..abc import Endpoint
class OGCAPIEndpoint(Endpoint):
......
......@@ -7,7 +7,7 @@ import requests
import lxml.etree as ET
import pystac
from ._endpoint import Endpoint
from ..abc import Endpoint
from .query import Query
from ..exceptions import QueryError
......
......@@ -3,7 +3,7 @@ from typing import Iterator
import requests
from ._endpoint import Endpoint
from ..abc import Endpoint
from .query import Query
logger = logging.getLogger(__name__)
......
from typing import Optional
from ..abc import FileScheme
from ..source import get_source
from .filematcher import FileMatcherScheme
from .stac_catalog import STACCatalogScheme
SCHEME_MAP = {
"FileMatcher": FileMatcherScheme,
"STACCatalog": STACCatalogScheme
}
def get_filescheme(filescheme_cfg: dict) -> Optional[FileScheme]:
cls = SCHEME_MAP.get(filescheme_cfg.pop("type"))
if not cls:
return None
return cls(get_source(filescheme_cfg.pop("source")), **filescheme_cfg)
import logging
import re
from typing import Iterator
from dateutil.parser import isoparse
import pystac
from ..abc import FileScheme, Source
logger = logging.getLogger(__name__)
class FileMatcherScheme(FileScheme):
def __init__(self, source: Source, path: str, time_regex: str, url: str):
super().__init__(source)
self.path = path
self.time_regex = re.compile(time_regex)
self.url = url
def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 harvesting")
for filename in self.source.listdir(self.path):
if match := re.search(self.time_regex, filename):
dt = isoparse(match[0])
item = self._create_item(filename, dt)
yield item.to_dict()
def _create_item(self, path, dt):
identifier = dt.strftime("%Y%m%d_%H%M%S")
properties = {
"datetime": dt,
"updated": self.source.stat(path).mtime,
}
item = pystac.Item(
id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties
)
item.add_asset(identifier, pystac.Asset(f"{self.url}{path}"))
return item
import logging
import json
from typing import Iterator
from urllib.parse import urljoin
from ..abc import FileScheme, Source
logger = logging.getLogger(__name__)
class STACCatalogScheme(FileScheme):
def __init__(self, source: Source, root_path: str):
super().__init__(source)
self.root_path = root_path
def _read_json(self, path):
return json.load(self.source.open(path))
def harvest(self) -> Iterator[dict]:
yield from self.harvest_catalog(self.root_path)
def harvest_catalog(self, path: str) -> Iterator[dict]:
"""
Harvests a specified STAC catalog. Will recurse into child catalogs
and yield all included STAC items.
"""
logger.info(f"Harvesting from catalog {path}")
catalog = self._read_json(path)
for link in catalog["links"]:
if link["rel"] == "item":
item_href = urljoin(path, link["href"])
logger.info(f"Harvested item {item_href}")
yield self._read_json(item_href)
elif link["rel"] == "child":
catalog_href = urljoin(path, link["href"])
yield from self.harvest_catalog(catalog_href)
......@@ -5,9 +5,9 @@ from typing import Union
from redis import Redis
from mergedeep import merge
from .resource import Resource
from .abc import Resource
from .endpoint import get_endpoint
from .source import get_source
from .filescheme import get_filescheme
from .exceptions import HarvestError
from .utils import cql_filter
from .postprocess import get_postprocessor
......@@ -17,10 +17,10 @@ logger = logging.getLogger(__name__)
def init_resource(harvest_config: dict) -> Resource:
config: dict = harvest_config.pop("resource")
if endpoint := get_endpoint(config):
if endpoint := get_endpoint(dict(config)):
return endpoint
if source := get_source(config):
if source := get_filescheme(dict(config)):
return source
raise HarvestError(f"Resource type {config['type']} not found")
......
from abc import ABC, abstractmethod
from typing import Dict, Type
from .utils import import_by_path
class Postprocessor(ABC):
def __init__(self, **kwargs):
......@@ -17,5 +19,9 @@ POSTPROCESSORS: Dict[str, Type[Postprocessor]] = {
def get_postprocessor(config: dict) -> Postprocessor:
cls = POSTPROCESSORS[config.pop("type")]
type_ = config.pop("type")
try:
cls = POSTPROCESSORS[type_]
except KeyError:
cls = import_by_path(type_)
return cls(**config)
from typing import Optional
from .stac_catalog import STACCatalogSource
from ..abc import Source
from .swift import SwiftSource
from .s3 import S3Source, S3CatalogSource
from .s3 import S3Source
from .ftp import FTPSource
from ._source import Source
SOURCE_MAP = {
"FTP": FTPSource,
"S3": S3Source,
"S3Catalog": S3CatalogSource,
"STACCatalog": STACCatalogSource,
"Swift": SwiftSource,
}
def get_source(source_cfg: dict) -> Optional[Source]:
cls = SOURCE_MAP.get(source_cfg["type"])
if not cls:
return None
parameters = source_cfg["parameters"]
source = cls(parameters)
return source
cls = SOURCE_MAP[source_cfg.pop("type")]
return cls(**source_cfg)
__all__ = [
"FTPSource",
"S3Source",
"S3CatalogSource",
"STACCatalogSource",
"SwiftSource",
"Source",
"get_source",
]
from typing import Dict
from ..resource import Resource
class Source(Resource):
type: str = "Source"
def __init__(self, parameters: Dict[str, str], *args, **kwargs):
self.parameters = parameters
from typing import Iterator
from ._source import Source
from ..abc import Source
class FTPSource(Source):
......
import re
import logging
from dateutil.parser import isoparse
from typing import TYPE_CHECKING, Iterator, Tuple
from typing import TYPE_CHECKING, IO, AnyStr, List, Tuple
from functools import cached_property
from urllib.parse import urlparse
import json
from os.path import dirname, join
if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
......@@ -16,28 +13,27 @@ import botocore.session
import botocore.handlers
from botocore import UNSIGNED
from botocore.config import Config
import pystac
from ._source import Source
from ..abc import Source, Stat
logger = logging.getLogger(__name__)
class S3Base:
class S3Source(Source):
def __init__(
self,
bucket: str,
secret_access_key: str = None,
access_key_id: str = None,
endpoint_url: str = "",
strip_bucket: bool = True,
validate_bucket_name: bool = True,
region_name: str = None,
public: bool = False,
):
self.bucket = bucket
self.secret_access_key = secret_access_key
self.access_key_id = access_key_id
self.endpoint_url = endpoint_url
self.strip_bucket = strip_bucket
self.region_name = region_name
self.public = public
self.validate_bucket_name = validate_bucket_name
......@@ -62,116 +58,33 @@ class S3Base:
)
return client
def get_bucket_and_key(self, path: str) -> Tuple[str, str]:
parsed = urlparse(path)
if parsed.scheme and parsed.scheme.lower() != "s3":
raise ValueError(f"Invalid S3 URL {path}")
class S3Source(Source):
type = "S3"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@property
def client(self) -> "S3Client":
botocore_session = botocore.session.Session()
session = boto3.session.Session(botocore_session=botocore_session)
client = session.client(
"s3",
aws_access_key_id=self.parameters["access_key_id"],
aws_secret_access_key=self.parameters["secret_access_key"],
region_name=self.parameters["region"],
)
return client
@property
def bucket(self) -> str:
bucket = self.parameters["url"].strip("https://").split(".")[0]
return bucket
if parsed.netloc:
path = parsed.path
if path.startswith("/"):
path = path[1:]
return (parsed.netloc, path)
def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 harvesting")
if path.startswith("/"):
path = path[1:]
return (self.bucket, path)
def listdir(self, path) -> List[str]:
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket, Prefix=self.parameters["prefix"])
time_regex: str = self.parameters["time_regex"]
for page in pages:
for file in page["Contents"]:
if match := re.search(time_regex, file["Key"]):
dt = isoparse(match[0])
item = self._create_item(file, dt, self.parameters["url"])
yield item.to_dict()
def _create_item(self, data, dt, url):
identifier = dt.strftime("%Y%m%d_%H%M%S")
properties = {
"datetime": dt,
"updated": data["LastModified"],
}
item = pystac.Item(
id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties
)
item.add_asset(identifier, pystac.Asset(f"{url}{data['Key']}"))
return item
bucket, prefix = self.get_bucket_and_key(path)
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
return [file["Key"] for page in pages for file in page["Contents"]]
class S3CatalogSource(S3Base):
type = "S3Catalog"
def __init__(self, parameters: dict, **kwargs):
self.root_href = parameters.pop("root_href")
self.default_catalog_name = parameters.pop("default_catalog_name", None)
super().__init__(**parameters)
def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 Catalog harvesting")
parsed = urlparse(self.root_href)
path = parsed.path
if path.startswith("/"):
path = parsed.path[1:]
if path.endswith("/") and self.default_catalog_name:
path = join(path, self.default_catalog_name)
yield from self.harvest_catalog(parsed.netloc, path)
def fetch_json(self, bucket: str, key: str) -> dict:
"""
Loads the given object identifier by bucket and key and loads it as
JSON.
"""
if key.startswith("/"):
key = key[1:]
response = self.client.get_object(Bucket=bucket, Key=key)
return json.load(response["Body"])
def join_href(self, bucket: str, key: str, href: str) -> Tuple[str, str]:
"""
Joins the given href with a previous bucket/key. When we have a fully
qualified S3 URL, the included bucket/key pair is returned.
If href is a relative path, it is joined with the previous key.
"""
parsed = urlparse(href)
if parsed.netloc:
if parsed.scheme.lower() != "s3":
# TODO: what if HTTP hrefs?
raise ValueError("Can only join S3 URLs")
return (parsed.netloc, parsed.path)
else:
return (
bucket,
join(dirname(key), parsed.path),
)
def open(self, path: str) -> IO[AnyStr]:
bucket, key = self.get_bucket_and_key(path)
return self.client.get_object(Bucket=bucket, Key=key)["Body"]
def harvest_catalog(self, bucket: str, key: str) -> Iterator[dict]:
"""
Harvests a specified STAC catalog. Will recurse into child catalogs
and yield all included STAC items.
"""
logger.info(f"Harvesting from catalog {bucket}/{key}")
catalog = self.fetch_json(bucket, key)
for link in catalog["links"]:
if link["rel"] == "item":
item_bucket, item_key = self.join_href(bucket, key, link["href"])
logger.info(f"Harvested item {item_bucket}/{item_key}")
yield self.fetch_json(item_bucket, item_key)
elif link["rel"] == "child":
cat_bucket, cat_key = self.join_href(bucket, key, link["href"])
yield from self.harvest_catalog(cat_bucket, cat_key)
def stat(self, path: str) -> Stat:
bucket, key = self.get_bucket_and_key(path)
response = self.client.head_object(Bucket=bucket, Key=key)
return Stat(mtime=response["LastModified"], size=response["ContentLength"])
from typing import Iterator
from ._source import Source
class STACCatalogSource(Source):
type = "STACCatalog"
def harvest(self) -> Iterator[dict]:
raise NotImplementedError()
from typing import Iterator
from ._source import Source
from ..abc import Source
class SwiftSource(Source):
......
from typing import Iterator
import importlib
from typing import Any, Iterator
from pygeofilter.backends.native.evaluate import NativeEvaluator
from pygeofilter.parsers.cql_json import parse as json_parse
......@@ -13,3 +14,14 @@ def cql_filter(_filter: dict, data: Iterator[dict]) -> Iterator[dict]:
nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
evaluator = nat_eval.evaluate(_filter)
yield from filter(evaluator, data)
def import_by_path(path: str) -> Any:
"""Imports the object from the referenced module.
Args:
path (str): the dotted Python path, where the last element is the
object in the referenced module.
"""
module_path, _, object_name = path.rpartition(".")
return getattr(importlib.import_module(module_path), object_name)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment