EOX GitLab Instance

Commit 5c1d26c0 authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

added tests

refactored functions
added fixtures and skeleton
parent 7c98e7a6
Pipeline #18465 failed with stage
in 19 seconds
include ./harvester/config-schema.yaml
from os.path import join, dirname
"""
cli.py
======
Contains command line interface
"""
import logging.config
import click
import yaml
import jsonschema
from .daemon import run_daemon
from .config import load_config
from .daemon import run_daemon, init_client
from .config import load_config, validate_config
from .harvester import main
from .utils import get_params
def setup_logging(debug=False):
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'brief': {
'format': '%(levelname)s %(name)s: %(message)s'
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {"brief": {"format": "%(levelname)s %(name)s: %(message)s"}},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG" if debug else "INFO",
"formatter": "brief",
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG' if debug else 'INFO',
'formatter': 'brief',
}
"root": {
"handlers": ["console"],
"level": "DEBUG" if debug else "INFO",
},
'root': {
'handlers': ['console'],
'level': 'DEBUG' if debug else 'INFO',
}
})
def validate_config(config):
with open(join(dirname(__file__), 'config-schema.yaml')) as f:
schema = yaml.load(f)
jsonschema.validate(config, schema)
)
@click.group()
......@@ -45,14 +40,21 @@ def cli():
pass
@cli.command(help='Run the harvester daemon, attaching to a Redis queue')
@click.option('--config-file', type=click.File('r'))
@click.option('--validate/--no-validate', default=False)
@click.option('--host', type=str)
@click.option('--port', type=int)
@click.option('--listen-queue', type=str)
@click.option('--debug/--no-debug', default=False)
def daemon(config_file=None, validate=False, host=None, port=None, listen_queue=None, debug=False):
@cli.command(help="Run the harvester daemon, attaching to a Redis queue")
@click.option("--host", type=str, required=True)
@click.option("--port", type=int, required=True)
@click.option("--listen-queue", type=str, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
@click.option("--validate/--no-validate", default=False)
@click.option("--debug/--no-debug", default=False)
def daemon(
host,
port,
listen_queue,
config_file=None,
validate=False,
debug=False,
):
setup_logging(debug)
config = load_config(config_file)
if validate:
......@@ -60,21 +62,29 @@ def daemon(config_file=None, validate=False, host=None, port=None, listen_queue=
run_daemon(config, host, port, listen_queue)
@cli.command(help='Run a single, one-off harvest')
@click.argument('harvester_name', type=str)
@click.option('--config-file', type=click.File('r'))
@click.option('--validate/--no-validate', default=False)
@click.option('--debug/--no-debug', default=False)
@click.option( '--param', '-p', multiple=True)
def harvest(harvester_name, config_file: str=None, validate: bool=False, debug: bool=False, param: tuple=()):
@cli.command(help="Run a single, one-off harvest")
@click.argument("harvester_name", type=str)
@click.option("--host", type=str, required=True)
@click.option("--port", type=int, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
@click.option("--validate/--no-validate", default=False)
@click.option("--debug/--no-debug", default=False)
def harvest(
harvester_name: str,
host: str,
port: int,
config_file: str = None,
validate: bool = False,
debug: bool = False,
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
kwargs = get_params(param)
kwargs['name'] = harvester_name
main(config, harvester_name, **kwargs)
client = init_client(host, port)
main(config, harvester_name, client)
if __name__ == '__main__':
if __name__ == "__main__":
cli()
......@@ -2,16 +2,6 @@ $id: https://example.com/address.schema.json
$schema: http://json-schema.org/draft-07/schema#
type: object
properties:
redis:
description: Redis configuration
type: object
properties:
host:
description: Host address for Redis
type: string
port:
description: Port for Redis
type: integer
harvesters:
description: List of harvesters
type: array
......
......@@ -3,10 +3,12 @@ import re
import datetime
from typing import TextIO
import jsonschema
import yaml
ENV_PATTERN = re.compile(r'.*?\${(\w+)}.*?')
ENV_PATTERN = re.compile(r".*?\${(\w+)}.*?")
LOADER = yaml.SafeLoader
def constructor_env_variables(loader, node):
......@@ -22,33 +24,37 @@ def constructor_env_variables(loader, node):
if match:
full_value = value
for g in match:
env_variable = os.environ.get(g, )
if env_variable is not None:
full_value = full_value.replace(
f'${{{g}}}', env_variable
env_variable = os.environ.get(
g,
)
if env_variable is not None:
full_value = full_value.replace(f"${{{g}}}", env_variable)
else:
return None
return full_value
return value
def now(loader, node):
def constructor_now(loader, node):
return datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
tags = {
'!env': constructor_env_variables,
'!now': now,
}
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
LOADER.add_implicit_resolver("!env", ENV_PATTERN, None)
LOADER.add_constructor("!env", constructor_env_variables)
# this tag resolves !now to datetime.now
LOADER.add_constructor("!now", constructor_now)
def load_config(input_file: TextIO) -> dict:
loader = yaml.SafeLoader
return yaml.load(input_file, Loader=LOADER)
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
for tag, func in tags.items():
if tag == '!env':
loader.add_implicit_resolver(tag, ENV_PATTERN, None)
loader.add_constructor(tag, func)
def validate_config(config):
with open(
os.path.join(os.path.dirname(__file__), "config-schema.yaml"), encoding="utf-8"
) as file:
schema = yaml.load(file)
return yaml.load(input_file, Loader=loader)
jsonschema.validate(config, schema)
"""
daemon.py
==========
Contains functionality related to running the daemon
"""
import logging
import redis
from redis import Redis
from .harvester import main
logger = logging.getLogger(__name__)
def init_client(host: str, port: int) -> Redis:
redis = Redis(host=host, port=port, charset="utf-8", decode_responses=True)
return redis
def run_daemon(config: dict, host: str, port: str, listen_queue: str):
""" Run the harvester daemon, listening on a redis queue
"""Run the harvester daemon, listening on a redis queue
for harvest jobs.
"""
# initialize the queue client
client = redis.Redis(
host=host, port=port, charset="utf-8", decode_responses=True
)
logger.debug("waiting for redis queue '%s'..." % listen_queue)
client = init_client(host, port)
logger.debug("waiting for redis queue '%s'", listen_queue)
while True:
# fetch an item from the queue to be harvested
_, value = client.brpop(listen_queue) # first param which queue ther result came from
_, value = client.brpop(listen_queue)
# start the harvesting
try:
main(config, value, client)
except Exception as e:
logger.exception(e)
from datetime import datetime
import logging
from typing import Type, List
from abc import ABC, abstractmethod
from typing import List
from pygeofilter.parsers.cql_json import parse as json_parse
from pygeofilter.backends.native.evaluate import NativeEvaluator
......@@ -9,14 +9,16 @@ from ..query import Query
logger = logging.getLogger(__name__)
class Endpoint:
class Endpoint(ABC):
type = None
def __init__(self, url: str, query: dict, filter: dict, *args, **kwargs):
self.url = url
self.query = Query(**query)
self.filter = json_parse(filter)
@abstractmethod
def harvest(self) -> list:
# All endpoints should extend function to do following
# 1. prepare query
......@@ -24,24 +26,20 @@ class Endpoint:
# 3. convert to stac items
# 4. filter data if necessary
# 5. return list of stac items
raise NotImplementedError()
pass
@classmethod
def from_config(cls, endpoint_config: dict) -> Type['Endpoint']:
def from_config(cls, endpoint_config: dict) -> "Endpoint":
subclass_map = {subclass.type: subclass for subclass in cls.__subclasses__()}
endpoint_type = endpoint_config.pop('type', None)
endpoint_type = endpoint_config.pop("type", None)
SubClass = subclass_map[endpoint_type]
return SubClass(**endpoint_config)
subclass = subclass_map[endpoint_type]
return subclass(**endpoint_config)
def filter_data(self, data: List[dict]) -> List[dict]:
attr_map = {
'point_attr': 'geometry',
'*': 'properties.*'
}
e = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
evaluator = e.evaluate(self.filter)
attr_map = {"point_attr": "geometry", "*": "properties.*"}
nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
evaluator = nat_eval.evaluate(self.filter)
result = list(filter(evaluator, data))
return result
from .FTPEndpoint import FTPEndpoint
from .OADSEndpoint import OADSEndpoint
from .OGCAPIEndpoint import OGCAPIEndpoint
from .OpenSearchEndpoint import OpenSearchEndpoint
from .S3Endpoint import S3Endpoint
from .STACAPIEndpoint import STACAPIEndpoint
from .STACCatalogEndpoint import STACCatalogEndpoint
from .SwiftEndpoint import SwiftEndpoint
from ..query import Query
from .ftp import FTPEndpoint
from .oads import OADSEndpoint
from .ogcapi import OGCAPIEndpoint
from .opensearch import OpenSearchEndpoint
from .s3 import S3Endpoint
from .stacapi import STACAPIEndpoint
from .stac_catalog import STACCatalogEndpoint
from .swift import SwiftEndpoint
from .Endpoint import Endpoint
__all__ = [
FTPEndpoint,
OADSEndpoint,
OGCAPIEndpoint,
OpenSearchEndpoint,
S3Endpoint,
STACAPIEndpoint,
STACCatalogEndpoint,
SwiftEndpoint,
Endpoint
"FTPEndpoint",
"OADSEndpoint",
"OGCAPIEndpoint",
"OpenSearchEndpoint",
"S3Endpoint",
"STACAPIEndpoint",
"STACCatalogEndpoint",
"SwiftEndpoint",
"Endpoint",
]
from .Endpoint import Endpoint
class FTPEndpoint(Endpoint):
type = 'FTP'
type = "FTP"
from .Endpoint import Endpoint
class OADSEndpoint(Endpoint):
type = 'OADS'
type = "OADS"
from .Endpoint import Endpoint
class OGCAPIEndpoint(Endpoint):
type = 'OGCAPI'
type = "OGCAPI"
from abc import abstractmethod, ABC
import logging
from typing import List, Optional, Tuple, Type
from dataclasses import dataclass
from typing import List
from dataclasses import dataclass, field
import requests
import lxml.etree as ET
import pystac
from .Endpoint import Endpoint
from ..stac import STACItemComposer
logger = logging.getLogger(__name__)
SearchPage = Tuple[List[dict], Optional[str]]
@dataclass
class SearchPage:
records: List[dict]
index: int
total: int
records: List[dict] = field(default_factory=list)
class OpenSearchFormat:
class OpenSearchFormat(ABC):
mimetype = None
@classmethod
def from_config(cls, config: dict) -> Type['OpenSearchFormat']:
def from_config(cls, config: dict) -> "OpenSearchFormat":
subclass_map = {
subclass.mimetype: subclass for subclass in cls.__subclasses__()
}
type_ = config.pop('type', None)
type_ = config.pop("type", None)
subclass = subclass_map[type_]
return subclass(**config)
@abstractmethod
def parse(self, response: requests.Response) -> SearchPage:
"""Parser of different opensearch formats
SubClass = subclass_map[type_]
return SubClass(**config)
Args:
response (requests.Response): response fetched from server
Returns:
SearchPage: a single search page
"""
class GeoJSONFormat(OpenSearchFormat):
mimetype = 'application/json'
mimetype = "application/json"
def __init__(self, property_mapping):
self.property_mapping = property_mapping
def parse(self, response: requests.Response) -> SearchPage:
data = response.json()
features = [
self._parse_feature(feature) for feature in data['features']
]
features = [self._parse_feature(feature) for feature in data["features"]]
return SearchPage(
features,
data['properties']['startIndex'],
data['properties']['totalResults'],
index=data["properties"]["startIndex"],
total=data["properties"]["totalResults"],
records=features,
)
def _parse_feature(self, feature: dict) -> dict:
properties = {
property_name: feature['properties'][property_path]
property_name: feature["properties"][property_path]
for property_name, property_path in self.property_mapping.items()
}
return pystac.Item(
feature['id'],
geometry=feature['geometry'],
feature["id"],
geometry=feature["geometry"],
bbox=None,
datetime=None,
properties=properties,
......@@ -70,7 +76,7 @@ class GeoJSONFormat(OpenSearchFormat):
class AtomFormat(OpenSearchFormat):
mimetype = 'application/atom+xml'
mimetype = "application/atom+xml"
def __init__(self):
# TODO: maybe add mapping from XML -> properties in output STAC
......@@ -78,24 +84,24 @@ class AtomFormat(OpenSearchFormat):
class OpenSearchEndpoint(Endpoint):
type = 'OpenSearch'
type = "OpenSearch"
NS = {
'': 'http://www.w3.org/2005/Atom',
'opensearch': 'http://a9.com/-/spec/opensearch/1.1/',
'parameters': 'http://a9.com/-/spec/opensearch/extensions/parameters/1.0/',
'georss': 'http://www.georss.org/georss/',
'media': 'http://search.yahoo.com/mrss/',
'owc': 'http://www.opengis.net/owc/1.0/',
'eo': 'http://a9.com/-/opensearch/extensions/eo/1.0/',
'geo': 'http://a9.com/-/opensearch/extensions/geo/1.0/',
'time': 'http://a9.com/-/opensearch/extensions/time/1.0/',
'cql': 'http://a9.com/-/opensearch/extensions/cql/1.0/',
'dc': 'http://purl.org/dc/elements/1.1/',
"": "http://www.w3.org/2005/Atom",
"opensearch": "http://a9.com/-/spec/opensearch/1.1/",
"parameters": "http://a9.com/-/spec/opensearch/extensions/parameters/1.0/",
"georss": "http://www.georss.org/georss/",
"media": "http://search.yahoo.com/mrss/",
"owc": "http://www.opengis.net/owc/1.0/",
"eo": "http://a9.com/-/opensearch/extensions/eo/1.0/",
"geo": "http://a9.com/-/opensearch/extensions/geo/1.0/",
"time": "http://a9.com/-/opensearch/extensions/time/1.0/",
"cql": "http://a9.com/-/opensearch/extensions/cql/1.0/",
"dc": "http://purl.org/dc/elements/1.1/",
}
def __init__(self, format_config, *args, **kwargs):
super(OpenSearchEndpoint, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.format = OpenSearchFormat.from_config(format_config)
def harvest(self) -> list:
......@@ -104,17 +110,20 @@ class OpenSearchEndpoint(Endpoint):
# prepare query
parser = ET.XMLParser(recover=True)
data = ET.fromstring(requests.get(self.url).content, parser)
urls = self._get_urls_params(data)
urls = self._get_url_param_mapping(data)
result = []
url, search_params, (index, index_val), _ = self.query.opensearch_params(
urls, self.format.mimetype
)
params = self.query.opensearch_params(urls, self.format.mimetype)
url = params["url"]
search_params = params["search_params"]
index_keyword = params["index_keyword"]
index_value = params["index_value"]
while True:
response = requests.get(url, params=search_params)
response.raise_for_status()
search_params[index] += index_val
search_params[index_keyword] += index_value
page = self.format.parse(response)
if page.records:
result.extend(page.records)
......@@ -123,20 +132,22 @@ class OpenSearchEndpoint(Endpoint):
return result
def _get_urls_params(self, data: ET._Element) -> dict:
def _get_url_param_mapping(self, data: ET._Element) -> dict:
urls = {}
for url in data.findall('opensearch:Url', self.NS):
type = url.get('type')
urls[type] = {}
urls[type]['url'] = url.get('template').split('?')[0]
urls[type]['params'] = []
for param in url.findall('parameters:Parameter', self.NS):
urls[type]['params'].append({
'name': param.get('name'),
'value': param.get('value'),
'minInclusive': param.get('minInclusive', None),
'maxInclusive': param.get('maxInclusive', None),
'minimum': param.get('minimum', None),
'maximum': param.get('maximum', None)
})
for url in data.findall("opensearch:Url", self.NS):
_type = url.get("type")
urls[_type] = {}
urls[_type]["url"] = url.get("template").split("?")[0]
urls[_type]["params"] = []
for param in url.findall("parameters:Parameter", self.NS):
urls[_type]["params"].append(
{
"name": param.get("name"),
"value": param.get("value"),
"minInclusive": param.get("minInclusive", None),
"maxInclusive": param.get("maxInclusive", None),