EOX GitLab Instance

Commit e81f9640 authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

updated stacapi params

moved temporal handling from opensearch to more generic function
added filter function now for pygeofiltering
removed one instance of abc for postprocessor
added back error handling from other branch to match recent updates
parent a4a3116c
Pipeline #22918 passed with stage
in 49 seconds
Contains functionality related to running the daemon
Contains functionality related to running the application waiting on redis messages
......@@ -183,27 +183,17 @@ class OpenSearchQuery(Query):
# the name is what should be used as a key when querying and this differs
# from implementation to implementation
# the value contains the "standardized" interface by which we need to get
# the query parameters from the queryables. Phew!
# the query parameters from the queryables. Phew!
urls = kwargs.pop("urls")
mimetype = kwargs.pop("mimetype", "application/atom+xml")
url = urls[mimetype]["url"]
queryables = urls[mimetype]["params"]
if isinstance(self.time_begin, str):
time_begin = self.time_begin
time_begin = self.time_begin.isoformat()
if isinstance(self.time_end, str):
time_end = self.time_end
time_end = self.time_end.isoformat()
mapping = {
"{geo:box}": self.bbox,
"{time:start}": time_begin,
"{time:end}": time_end,
"{time:start}": self.time_begin,
"{time:end}": self.time_end,
"{count}": int(
......@@ -10,15 +10,26 @@ class Query(ABC):
bbox: str = None,
collection: str = None,
self.time_begin, self.time_end = self._time(time)
self.time_begin, self.time_end = self.prepare_time(time)
self.bbox = bbox
self.collection = collection
def _time(time: Dict[str, Any]) -> Tuple[datetime, datetime]:
def prepare_time(time: Dict[str, Any]) -> Tuple[str, str]:
time_begin = time.pop("begin", datetime.now(timezone.utc))
time_end = time.pop("end", datetime.now(timezone.utc))
if isinstance(time_begin, str):
time_begin = time_begin
time_begin = time_begin.isoformat()
if isinstance(time_end, str):
time_end = time_end
time_end = time_end.isoformat()
return time_begin, time_end
......@@ -51,7 +51,7 @@ class StacAPIQuery(Query):
if not queryables:
params = {
"datetime": f"{self.time_begin.isoformat()}/{self.time_end.isoformat()}",
"datetime": f"{self.time_begin}/{self.time_end}",
"limit": 100,
"page": 1,
"bbox": f"[{self.bbox}]",
import json
from typing import Tuple, Union, Generator
from typing import Tuple, Union, Iterator
from structlog import get_logger
from mergedeep import merge
from .abc import Resource
from .abc import Postprocessor, Resource
from .endpoint import get_endpoint
from .filescheme import get_filescheme
from .exceptions import HarvestError
......@@ -44,9 +44,7 @@ def get_harvest_config(config: dict, name: str, values: dict) -> dict:
raise KeyError(name)
def main(
config: dict, item: Union[dict, str]
) -> Generator[Tuple[str, str], None, None]:
def main(config: dict, item: Union[dict, str]) -> Iterator[Tuple[str, str]]:
if isinstance(item, str):
name = item
values = {}
......@@ -66,13 +64,8 @@ def main(
result = resource.harvest()
if "postprocess" in harvest_config:
postprocess_configs = harvest_config["postprocess"]
if not isinstance(postprocess_configs, list):
postprocess_configs = [postprocess_configs]
for postprocess_config in postprocess_configs:
postprocessor = get_postprocessor(postprocess_config)
result = (postprocessor.postprocess(item) for item in result)
postprocessor = get_postprocessor(harvest_config["postprocess"])
result = apply_postprocessing(result, postprocessor)
# Filter data
if "filter" in harvest_config:
......@@ -80,3 +73,15 @@ def main(
for item in result:
yield harvest_config["queue"], json.dumps(item, default=str)
def apply_postprocessing(
items: Iterator[dict], postprocessor: Postprocessor
) -> Iterator[dict]:
"""Wrapper to correctly handle errors in postprocessing."""
for item in items:
yield postprocessor.postprocess(item)
except Exception as e:
LOGGER.error("Failed to apply postprocessor", item=item)
from typing import Dict, Type
from ..utils import import_by_path
from .abc import Postprocessor
from ..abc import Postprocessor
from .static import StaticValuesPostprocessor
from abc import ABC, abstractmethod
class Postprocessor(ABC):
Postprocessors can alter all harvested items.
def __init__(self, **kwargs):
def postprocess(self, item: dict) -> dict:
from .abc import Postprocessor
from ..abc import Postprocessor
def merge(a: dict, b: dict, path=None):
from datetime import datetime
import importlib
from typing import Any, Iterator
......@@ -5,13 +6,18 @@ from pygeofilter.backends.native.evaluate import NativeEvaluator
from pygeofilter.parsers.cql_json import parse as json_parse
CQL_FUNCTION_MAP = {"now": datetime.now}
CQL_ATTR_MAP = {"point_attr": "geometry", "*": "properties.*"}
def cql_filter(_filter: dict, data: Iterator[dict]) -> Iterator[dict]:
if not _filter:
yield from data
_filter = json_parse(_filter)
attr_map = {"point_attr": "geometry", "*": "properties.*"}
nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
nat_eval = NativeEvaluator(
function_map=CQL_FUNCTION_MAP, attribute_map=CQL_ATTR_MAP, use_getattr=False
evaluator = nat_eval.evaluate(_filter)
yield from filter(evaluator, data)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment