EOX GitLab Instance

Commit 5fa116fb authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

added configuration for updates

parent ec9aff9a
Pipeline #22896 failed with stage
in 46 seconds
- name: S2L2A_Element84
url: https://earth-search.aws.element84.com/v0/
type: STACAPI # STACAPI, STACCatalog, OpenSearch, FTP, S3, Swift, OGCAPI, OADS
begin: 2021-08-01
end: 2021-08-31
property: datetime # datetime, updated, created ###if supported by stacapi###
collection: sentinel-s2-l2a-cogs
bbox: 14.9,47.7,16.4,48.7
- during:
- property: updated
- - P5D
- !now
queue: register # register, ingest, delete, update, preprocess
- name: MyS3STACCatalogHarvester
type: STACCatalog
type: S3
bucket: mybucket
secret_access_key: xxx
access_key_id: xxx
endpoint_url: myendpoint.storage.com
validate_bucket_name: False
region_name: RegionA
public: False
# path is not explicitly specified, but must be passed as argument
# path:
......@@ -12,6 +12,7 @@ from typing import Optional
from structlog import get_logger
from redis import Redis
from .config import HarvesterConfig
from .harvester import main
LOGGER = get_logger(__name__)
......@@ -29,8 +30,8 @@ TIMEOUT = 5
class App:
def __init__(
client: Optional[Redis],
config: dict,
client: Redis,
config: HarvesterConfig,
listen_queue: str,
) -> None:
self.client = client
......@@ -84,22 +84,23 @@ def daemon(
config_file: str,
validate: bool = False,
config = load_config(config_file)
if validate:
config = load_config(config_file)
client = init_client(host, port)
app = App(client, config, listen_queue)
if client:
app = App(client, config, listen_queue)
while not app.shutdown:
while not app.shutdown:
@cli.command(help="Run a single, one-off harvest")
@click.argument("harvester_name", type=str)
@click.option("--config-file", type=click.File("r"), required=True)
@click.option("--config-file", type=str, required=True)
@click.option("--values", type=str, default=None)
@click.option("--host", type=str, default=None)
@click.option("--port", type=str, default=None)
......@@ -112,9 +113,9 @@ def harvest(
port: int,
validate: bool = False,
config = load_config(config_file)
if validate:
config = load_config(config_file)
item = {"name": harvester_name}
if values:
import os
import re
import datetime
from dataclasses import dataclass, field
from os.path import join, dirname
from typing import Dict, List, Optional
from datetime import datetime
from omegaconf import OmegaConf
import jsonschema
import yaml
ENV_PATTERN = re.compile(r".*?\${(\w+)}.*?")
LOADER = yaml.SafeLoader
class DictSeriazable:
def to_dict(self):
return OmegaConf.to_container(OmegaConf.structured(self), enum_to_str=True)
def constructor_env_variables(loader, node):
Extracts the environment variable from the node's value
:param yaml.Loader loader: the yaml loader
:param node: the current node in the yaml
:return: the parsed string that contains the value of the environment
value = loader.construct_scalar(node)
match = ENV_PATTERN.findall(value) # to find all env variables in line
if match:
full_value = value
for g in match:
env_variable = os.environ.get(
if env_variable is not None:
full_value = full_value.replace(f"${{{g}}}", env_variable)
return None
return full_value
return value
class S3SourceConfig(DictSeriazable):
bucket: str
access_key_id: str
secret_access_key: str
validate_bucket_name: bool
region_name: str
public: bool
endpoint_url: Optional[str] = None
def constructor_now(loader, node):
return datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
class HTTPSourceConfig(DictSeriazable):
root_url: str
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
LOADER.add_implicit_resolver("!env", ENV_PATTERN, None)
LOADER.add_constructor("!env", constructor_env_variables)
# this tag resolves !now to datetime.now
LOADER.add_constructor("!now", constructor_now)
class SourceConfig:
s3: Optional[S3SourceConfig] = None
http: Optional[HTTPSourceConfig] = None
swift: Optional[str] = None
ftp: Optional[str] = None
def load_config(input_file: str) -> dict:
return yaml.load(input_file, Loader=LOADER)
class FileMatcherConfig(DictSeriazable):
source: SourceConfig
path: str
time_regex: str
id_regex: str
asset_id: str
def validate_config(config):
with open(
os.path.join(os.path.dirname(__file__), "config-schema.yaml"), encoding="utf-8"
) as file:
schema = yaml.load(file, LOADER)
class OADSConfig(DictSeriazable):
scan_href: str
use_oads_ext: bool
class TimeConfig:
begin: str
property: str
end: Optional[str] = None
class QueryConfig:
time: TimeConfig
collection: str
bbox: str
class STACAPIConfig(DictSeriazable):
url: str
query: QueryConfig
class ResourceConfig:
filematcher: Optional[FileMatcherConfig] = None
oads: Optional[OADSConfig] = None
stacapi: Optional[STACAPIConfig] = None
staccatalog: Optional[str] = None
ogcapi: Optional[str] = None
opensearch: Optional[str] = None
class PostprocessConfig(DictSeriazable):
type: str
values: Dict[str, str]
class HarvestConfig:
name: str
resource: ResourceConfig
queue: str
postprocess: List[PostprocessConfig] = field(default_factory=list)
filter: Optional[dict] = field(default_factory=dict)
class HarvesterConfig:
harvesters: List[HarvestConfig]
def load_config(cfg_path: str) -> HarvesterConfig:
OmegaConf.register_new_resolver("now", datetime.now().isoformat)
return OmegaConf.to_object(
OmegaConf.merge(OmegaConf.structured(HarvesterConfig), OmegaConf.load(cfg_path))
) # type: ignore
def validate_config(config_file: str):
with open(join(dirname(__file__), "config-schema.yaml")) as f1, open(
) as f2:
config = yaml.load(f1, Loader=yaml.SafeLoader)
schema = yaml.load(f2, Loader=yaml.SafeLoader)
jsonschema.validate(config, schema)
from typing import Optional
from ..abc import Endpoint
from .oads import OADSEndpoint
from .ogcapi import OGCAPIEndpoint
from .opensearch import OpenSearchEndpoint
from .stacapi import STACAPIEndpoint
from ..config import ResourceConfig
from ..abc import Endpoint
"OADS": OADSEndpoint,
"OpenSearch": OpenSearchEndpoint,
"oads": OADSEndpoint,
"ogcapi": OGCAPIEndpoint,
"opensearch": OpenSearchEndpoint,
"stacapi": STACAPIEndpoint,
def get_endpoint(endpoint_cfg: dict) -> Optional[Endpoint]:
cls = ENDPOINT_MAP.get(endpoint_cfg.pop("type"))
if not cls:
return None
def get_endpoint(resource_config: ResourceConfig) -> Optional[Endpoint]:
for endpoint, cls in ENDPOINT_MAP.items():
cfg = getattr(resource_config, endpoint)
if cfg:
return cls(**cfg.to_dict())
return cls(**endpoint_cfg)
return None
__all__ = [
......@@ -51,7 +51,7 @@ class StacAPIQuery(Query):
if not queryables:
params = {
"datetime": f"{self.time_begin.isoformat()}/{self.time_end.isoformat()}",
"datetime": f"{self.time_begin}/{self.time_end}",
"limit": 100,
"page": 1,
"bbox": f"[{self.bbox}]",
from typing import Optional
from ..abc import FileScheme
from ..source import get_source
from .filematcher import FileMatcherScheme
from .stac_catalog import STACCatalogScheme
from ..abc import FileScheme
from ..source import get_source
from ..config import ResourceConfig
"FileMatcher": FileMatcherScheme,
"STACCatalog": STACCatalogScheme
SCHEME_MAP = {"filematcher": FileMatcherScheme, "staccatalog": STACCatalogScheme}
def get_filescheme(filescheme_cfg: dict) -> Optional[FileScheme]:
cls = SCHEME_MAP.get(filescheme_cfg.pop("type"))
if not cls:
return None
def get_filescheme(resource_config: ResourceConfig) -> Optional[FileScheme]:
for scheme, cls in SCHEME_MAP.items():
if cfg := getattr(resource_config, scheme):
source = get_source(cfg.source)
cfg = cfg.to_dict()
return cls(source, **cfg)
return cls(get_source(filescheme_cfg.pop("source")), **filescheme_cfg)
return None
......@@ -2,7 +2,10 @@ import json
from typing import Tuple, Union, Generator
from structlog import get_logger
from mergedeep import merge
# from mergedeep import merge
from .config import HarvesterConfig, ResourceConfig
from .abc import Resource
from .endpoint import get_endpoint
......@@ -15,68 +18,43 @@ from .postprocess import get_postprocessor
LOGGER = get_logger(__name__)
def init_resource(harvest_config: dict) -> Resource:
config: dict = harvest_config.pop("resource")
def init_resource(resource_config: ResourceConfig) -> Resource:
if endpoint := get_endpoint(dict(config)):
if endpoint := get_endpoint(resource_config):
return endpoint
if source := get_filescheme(dict(config)):
if source := get_filescheme(resource_config):
return source
raise HarvestError(f"Resource type {config['type']} not found")
def get_harvest_config(config: dict, name: str, values: dict) -> dict:
Selects the harvester config for the given name.
Harvesters configuration can either be a list of dicts (with name property)
or a dict mapping name to harvester.
harvesters = config["harvesters"]
if isinstance(harvesters, dict):
return harvesters[name]
for harvest_config in harvesters:
if harvest_config["name"] == name:
return merge({}, harvest_config, values)
raise KeyError(name)
raise HarvestError(f"Resource type {resource_config} not found")
def main(
config: dict, item: Union[dict, str]
config: HarvesterConfig, item: Union[dict, str]
) -> Generator[Tuple[str, str], None, None]:
if isinstance(item, str):
name = item
values = {}
name = item["name"]
values = item.get("values", {})
LOGGER.info("Running harvesting...", name=name)
# get the harvest config
harvest_config = get_harvest_config(config, name, values)
harvest_config = next(h for h in config.harvesters if h.name == name)
# Initialize resource
resource = init_resource(harvest_config)
resource = init_resource(harvest_config.resource)
# Perform harvest
result = resource.harvest()
if "postprocess" in harvest_config:
postprocess_configs = harvest_config["postprocess"]
if not isinstance(postprocess_configs, list):
postprocess_configs = [postprocess_configs]
for postprocess_config in postprocess_configs:
postprocessor = get_postprocessor(postprocess_config)
result = (postprocessor.postprocess(item) for item in result)
for postprocess_config in harvest_config.postprocess:
postprocessor = get_postprocessor(postprocess_config)
result = (postprocessor.postprocess(item) for item in result)
# Filter data
if "filter" in harvest_config:
result = cql_filter(harvest_config["filter"], result)
if filter := harvest_config.filter:
result = cql_filter(filter, result)
for item in result:
yield harvest_config["queue"], json.dumps(item, default=str)
yield harvest_config.queue, json.dumps(item, default=str)
from typing import Dict, Type
from ..utils import import_by_path
from .abc import Postprocessor
from .static import StaticValuesPostprocessor
from ..config import PostprocessConfig
from ..utils import import_by_path
POSTPROCESSORS: Dict[str, Type[Postprocessor]] = {"static": StaticValuesPostprocessor}
def get_postprocessor(config: dict) -> Postprocessor:
type_ = config.pop("type")
def get_postprocessor(config: PostprocessConfig) -> Postprocessor:
type_ = config.type
except KeyError:
cls = import_by_path(type_)
return cls(**config)
cfg = config.to_dict()
return cls(**cfg)
......@@ -5,18 +5,22 @@ from .swift import SwiftSource
from .s3 import S3Source
from .ftp import FTPSource
from .http import HTTPSource
from ..config import SourceConfig
"FTP": FTPSource,
"S3": S3Source,
"Swift": SwiftSource,
"HTTP": HTTPSource,
"ftp": FTPSource,
"s3": S3Source,
"swift": SwiftSource,
"http": HTTPSource,
def get_source(source_cfg: dict) -> Optional[Source]:
cls = SOURCE_MAP[source_cfg.pop("type")]
return cls(**source_cfg)
def get_source(source_cfg: SourceConfig) -> Optional[Source]:
for source, cls in SOURCE_MAP.items():
if cfg := getattr(source_cfg, source):
return cls(**cfg.to_dict())
return None
__all__ = [
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment