EOX GitLab Instance

Commit eb5bbafa authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

various: cicd/code/files

added flake8 and mypy to test stage
added license
added tagging information to readme
added dev requirements
split endpoints into resources
added new ABC
improved logic and style
refactored code
added first version tests
parent 2af63104
Pipeline #18719 failed with stage
in 28 seconds
......@@ -13,6 +13,8 @@ test:
- python3 setup.py install
- pip3 install -r requirements.txt
- pip3 install -r requirements-test.txt
- flake8
- mypy .
- pytest --cov harvester --cov-report term-missing
publish_latest:
......
MIT License
Copyright (C) 2021 EOX IT Services GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# harvester
Service used to harvest data from various endpoints
## Tagging
**Before tagging remember to increment the chart version (`.chart/Chart.yaml`) manually!**
This repository uses `bump2version` for managing tags. To bump a version use
```
bump2version --new-version <new_version> # or bump2version <major|minor|patch>
git push && git push --tags
```
......@@ -48,18 +48,19 @@ def cli():
@click.option("--validate/--no-validate", default=False)
@click.option("--debug/--no-debug", default=False)
def daemon(
host,
port,
listen_queue,
config_file=None,
validate=False,
debug=False,
host: str,
port: int,
listen_queue: str,
config_file: str,
validate: bool = False,
debug: bool = False,
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
run_daemon(config, host, port, listen_queue)
client = init_client(host, port)
run_daemon(config, client, listen_queue)
@cli.command(help="Run a single, one-off harvest")
......@@ -73,7 +74,7 @@ def harvest(
harvester_name: str,
host: str,
port: int,
config_file: str = None,
config_file: str,
validate: bool = False,
debug: bool = False,
):
......
......@@ -62,3 +62,5 @@ properties:
required:
- queue
- name
- endpoint
- mode
import os
import re
import datetime
from typing import TextIO
import jsonschema
import yaml
......@@ -47,7 +46,7 @@ LOADER.add_constructor("!env", constructor_env_variables)
LOADER.add_constructor("!now", constructor_now)
def load_config(input_file: TextIO) -> dict:
def load_config(input_file: str) -> dict:
return yaml.load(input_file, Loader=LOADER)
......@@ -55,6 +54,6 @@ def validate_config(config):
with open(
os.path.join(os.path.dirname(__file__), "config-schema.yaml"), encoding="utf-8"
) as file:
schema = yaml.load(file)
schema = yaml.load(file, LOADER)
jsonschema.validate(config, schema)
......@@ -19,12 +19,11 @@ def init_client(host: str, port: int) -> Redis:
return redis
def run_daemon(config: dict, host: str, port: str, listen_queue: str):
def run_daemon(config: dict, client: Redis, listen_queue: str):
"""Run the harvester daemon, listening on a redis queue
for harvest jobs.
"""
# initialize the queue client
client = init_client(host, port)
logger.debug("waiting for redis queue '%s'", listen_queue)
while True:
......
import logging
from abc import ABC, abstractmethod
from typing import List
from pygeofilter.parsers.cql_json import parse as json_parse
from pygeofilter.backends.native.evaluate import NativeEvaluator
from ..query import Query
logger = logging.getLogger(__name__)
class Endpoint(ABC):
type = None
def __init__(self, url: str, query: dict, filter: dict, *args, **kwargs):
self.url = url
self.query = Query(**query)
self.filter = json_parse(filter)
@abstractmethod
def harvest(self) -> list:
# All endpoints should extend function to do following
# 1. prepare query
# 2. query endpoint
# 3. convert to stac items
# 4. filter data if necessary
# 5. return list of stac items
pass
@classmethod
def from_config(cls, endpoint_config: dict) -> "Endpoint":
subclass_map = {subclass.type: subclass for subclass in cls.__subclasses__()}
endpoint_type = endpoint_config.pop("type", None)
subclass = subclass_map[endpoint_type]
return subclass(**endpoint_config)
def filter_data(self, data: List[dict]) -> List[dict]:
attr_map = {"point_attr": "geometry", "*": "properties.*"}
nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
evaluator = nat_eval.evaluate(self.filter)
result = list(filter(evaluator, data))
return result
from ..query import Query
from .ftp import FTPEndpoint
from typing import Optional
from .oads import OADSEndpoint
from .ogcapi import OGCAPIEndpoint
from .opensearch import OpenSearchEndpoint
from .s3 import S3Endpoint
from .stacapi import STACAPIEndpoint
from .stac_catalog import STACCatalogEndpoint
from .swift import SwiftEndpoint
from .Endpoint import Endpoint
from ._endpoint import Endpoint
ENDPOINT_MAP = {
"OADS": OADSEndpoint,
"OGCAPI": OGCAPIEndpoint,
"OpenSearch": OpenSearchEndpoint,
"STACAPI": STACAPIEndpoint,
}
def get_endpoint(endpoint_cfg: dict) -> Optional[Endpoint]:
cls = ENDPOINT_MAP.get(endpoint_cfg["type"])
if not cls:
return None
url = endpoint_cfg.pop("url")
query = endpoint_cfg.pop("query")
endpoint = cls(url=url, query=query, **endpoint_cfg)
return endpoint
__all__ = [
"FTPEndpoint",
"OADSEndpoint",
"OGCAPIEndpoint",
"OpenSearchEndpoint",
"S3Endpoint",
"STACAPIEndpoint",
"STACCatalogEndpoint",
"SwiftEndpoint",
"Endpoint",
"get_endpoint",
]
from ..resource import Resource
class Endpoint(Resource):
type: str = "Endpoint"
def __init__(self, url: str, *args, **kwargs):
self.url = url
from .Endpoint import Endpoint
class FTPEndpoint(Endpoint):
type = "FTP"
from .Endpoint import Endpoint
from ._endpoint import Endpoint
class OADSEndpoint(Endpoint):
type = "OADS"
def harvest(self) -> list:
raise NotImplementedError()
from .Endpoint import Endpoint
from ._endpoint import Endpoint
class OGCAPIEndpoint(Endpoint):
type = "OGCAPI"
def harvest(self) -> list:
raise NotImplementedError()
from abc import abstractmethod, ABC
import logging
from typing import List
from typing import Any, Dict, List
from dataclasses import dataclass, field
import requests
import lxml.etree as ET
import pystac
from .Endpoint import Endpoint
from ._endpoint import Endpoint
from .query import Query
from ..exceptions import QueryError
logger = logging.getLogger(__name__)
@dataclass
@dataclass(frozen=True)
class SearchPage:
index: int
total: int
......@@ -21,16 +22,24 @@ class SearchPage:
class OpenSearchFormat(ABC):
mimetype = None
mimetype: str
@abstractmethod
def __init__(self, *args, **kwargs):
"""[summary]
Returns:
[type]: [description]
"""
@classmethod
def from_config(cls, config: dict) -> "OpenSearchFormat":
subclass_map = {
subclass.mimetype: subclass for subclass in cls.__subclasses__()
}
type_ = config.pop("type", None)
_type = config.pop("type")
subclass = subclass_map[type_]
subclass = subclass_map[_type]
return subclass(**config)
@abstractmethod
......@@ -82,6 +91,19 @@ class AtomFormat(OpenSearchFormat):
# TODO: maybe add mapping from XML -> properties in output STAC
pass
def parse(self):
raise NotImplementedError()
FORMAT_MAP = {"application/json": GeoJSONFormat, "application/atom+xml": AtomFormat}
def get_format(format_config: dict) -> OpenSearchFormat:
mimetype = format_config.pop("type")
cls = FORMAT_MAP[mimetype]
return cls(**format_config)
class OpenSearchEndpoint(Endpoint):
type = "OpenSearch"
......@@ -100,20 +122,20 @@ class OpenSearchEndpoint(Endpoint):
"dc": "http://purl.org/dc/elements/1.1/",
}
def __init__(self, format_config, *args, **kwargs):
def __init__(self, format_config, query, *args, **kwargs):
super().__init__(*args, **kwargs)
self.format = OpenSearchFormat.from_config(format_config)
self.format = get_format(format_config)
self.query = OpenSearchQuery(**query)
def harvest(self) -> list:
logger.info("Starting OpenSearch harvesting")
# prepare query
parser = ET.XMLParser(recover=True)
data = ET.fromstring(requests.get(self.url).content, parser)
urls = self._get_url_param_mapping(data)
result = []
params = self.query.opensearch_params(urls, self.format.mimetype)
params = self.query.prepare_params(urls=urls, mimetype=self.format.mimetype)
url = params["url"]
search_params = params["search_params"]
......@@ -133,7 +155,7 @@ class OpenSearchEndpoint(Endpoint):
return result
def _get_url_param_mapping(self, data: ET._Element) -> dict:
urls = {}
urls: Dict[str, Any] = {}
for url in data.findall("opensearch:Url", self.NS):
_type = url.get("type")
urls[_type] = {}
......@@ -151,3 +173,67 @@ class OpenSearchEndpoint(Endpoint):
}
)
return urls
class OpenSearchQuery(Query):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def prepare_params(self, *args, **kwargs):
# Opensearch parameters are a bit more nuanced to produce
# because of lack of standardization in some fields
# The parameters have a name and a value and some have allowed values
# the name is what should be used as a key when querying and this differs
# from implementation to implementation
# the value contains the "standardized" interface by which we need to get
# the query parameters from the queryables. Phew!
urls = kwargs.pop("urls")
mimetype = kwargs.pop("mimetype", "application/atom+xml")
url = urls[mimetype]["url"]
queryables = urls[mimetype]["params"]
mapping = {
"{geo:box}": self.bbox,
"{time:start}": self.time_begin.isoformat(),
"{time:end}": self.time_end.isoformat(),
"{count}": int(
next(
param["maxInclusive"]
for param in queryables
if param["value"] == "{count}"
)
),
"{startIndex}": int(
next(
param["minInclusive"]
for param in queryables
if param["value"] == "{startIndex}"
)
),
"{geo:geometry}": None,
"{geo:uid}": None,
}
mapping = {k: v for k, v in mapping.items() if v is not None}
search_params = {}
index_keyword = None
for param_name, value in mapping.items():
try:
_p = next(param for param in queryables if param_name == param["value"])
search_params[_p["name"]] = value
if _p["value"] == "{startIndex}":
index_keyword = _p["name"]
except StopIteration:
continue
if not index_keyword:
raise QueryError("Index keyword not found")
return {
"url": url,
"search_params": search_params,
"index_keyword": index_keyword,
"index_value": mapping["{count}"],
}
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Any, Tuple, Dict
class Query(ABC):
def __init__(
self,
time: Dict[str, Any] = {},
bbox: str = None,
collection: str = None,
):
self.time_begin, self.time_end, self.time_property = self._time(time)
self.bbox = bbox
self.collection = collection
@staticmethod
def _time(time: Dict[str, Any]) -> Tuple[datetime, datetime, str]:
time_begin = time.pop("begin", datetime.now(timezone.utc))
time_end = time.pop("end", datetime.now(timezone.utc))
time_property = time.pop("property", None)
return time_begin, time_end, time_property
@abstractmethod
def prepare_params(self, *args, **kwargs):
pass
from .Endpoint import Endpoint
class STACCatalogEndpoint(Endpoint):
type = 'STACCatalog'
......@@ -2,7 +2,8 @@ import logging
import requests
from .Endpoint import Endpoint
from ._endpoint import Endpoint
from .query import Query
logger = logging.getLogger(__name__)
......@@ -10,19 +11,19 @@ logger = logging.getLogger(__name__)
class STACAPIEndpoint(Endpoint):
type = "STACAPI"
def __init__(self, *args, **kwargs):
super(STACAPIEndpoint, self).__init__(*args, **kwargs)
def __init__(self, query, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query = StacAPIQuery(**query)
def harvest(self) -> list:
# prepare query
logger.info("Starting STACAPI harvesting")
main = requests.get(self.url).json()
search_url = next(
link["href"] for link in main["links"] if link["rel"] == "search"
)
search_params = self.query.stacapi_params(main)
search_params = self.query.prepare_params(root_response=main)
result = []
# query
logger.debug("querying %s with %s", search_url, search_params)
query_data = requests.get(search_url, params=search_params).json()
context = query_data["context"]
......@@ -34,8 +35,32 @@ class STACAPIEndpoint(Endpoint):
query_data = requests.get(search_url, params=search_params).json()
context = query_data["context"]
# filter data
if self.filter:
result = self.filter_data(result)
return result
class StacAPIQuery(Query):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def prepare_params(self, *args, **kwargs) -> dict:
root_response: dict = kwargs.pop("root_response")
queryables = False
# TODO: fetch stac queryables
# see if queryables match configured
for link in root_response.get("links", []):
if link["rel"] == "queryables":
queryables = link["href"]
if not queryables:
params = {
"datetime": f"{self.time_begin.isoformat()}/{self.time_end.isoformat()}",
"limit": 100,
"page": 1,
"bbox": f"[{self.bbox}]",
"collections": f'["{self.collection}"]',
}
else:
# TODO
params = {}
return params
from .Endpoint import Endpoint
class SwiftEndpoint(Endpoint):
type = "Swift"
import json
import logging
from typing import List, Optional
from redis import Redis
from .endpoint import Endpoint
from harvester.resource import Resource
from .endpoint import get_endpoint
from .source import get_source
from .exceptions import HarvestError
from .utils import cql_filter
logger = logging.getLogger(__name__)
class Harvester:
def __init__(
self,
name: str,
endpoint: Endpoint,
queue: str,
mode: str = "item",
extract_property: str = None,
):
self.name = name
self.endpoint = endpoint
self.queue = queue
self.mode = mode
self.extract_property = extract_property
@classmethod
def from_config(cls, config: dict, value: str) -> "Harvester":
logger.debug("Initializing Harvester from config with %s and %s", config, value)
try:
harvester_cfg = next(
item for item in config["harvesters"] if item["name"] == value
)
except StopIteration as ex:
raise HarvestError(f"Name {value} not found in config") from ex
return cls.from_dict(harvester_cfg)
def stringify(
result: List[dict], mode: str = "item", extract_property: Optional[str] = None
):
encoded: List[str] = []
if mode == "item":
encoded.extend((json.dumps(item) for item in result))
elif mode == "property":
encoded.extend((item["properties"][extract_property] for item in result))
return encoded
@classmethod
def from_dict(cls, value: dict) -> "Harvester":
endpoint_config = value.pop("endpoint")
_endpoint = Endpoint.from_config(endpoint_config)
return cls(endpoint=_endpoint, **value)
def init_resource(harvest_config: dict) -> Resource:
config: dict = harvest_config.pop("resource")
@classmethod
def from_json(cls, value: str) -> "Harvester":
logger.debug("Initializing Harvester from json with %s", value)