EOX GitLab Instance

Commit c7653408 authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

Merge branch 'tests_functionality' into 'main'

added tests

See merge request !1
parents 51cdc51d 2db5ae45
Pipeline #18911 passed with stages
in 2 minutes and 8 seconds
stages:
- test
- deploy
- publish
- chart
variables:
......@@ -11,14 +11,18 @@ test:
stage: test
script:
- python3 setup.py install
- pip3 install -r requirements.txt
- pip3 install -r requirements-test.txt
- pytest
- pip3 install -r requirements-dev.txt
- flake8
- mypy .
- pytest --cov harvester --cov-report term-missing
deploy_latest:
publish_latest:
image: docker:20.10.8
services:
- docker:20.10.8-dind
stage: deploy
stage: publish
script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" "$CI_REGISTRY"
- docker build -t $CI_REGISTRY_IMAGE .
......@@ -26,11 +30,11 @@ deploy_latest:
only:
- main
deploy:
publish:
image: docker:20.10.8
services:
- docker:20.10.8-dind
stage: deploy
stage: publish
script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" "$CI_REGISTRY"
- docker build --cache-from $CI_REGISTRY_IMAGE:latest -t $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG .
......@@ -47,4 +51,3 @@ helm:
- curl -u $HELM_CHART_REPOSITORY_CREDENTIALS -v -X POST https://charts-public.hub.eox.at/api/charts --data-binary "@${upload_filename}"
only:
- tags
- main
MIT License
Copyright (C) 2021 EOX IT Services GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
include ./harvester/config-schema.yaml
# harvester
Service used to harvest data from various endpoints
## Tagging
**Before tagging remember to increment the chart version (`.chart/Chart.yaml`) manually!**
This repository uses `bump2version` for managing tags. To bump a version use
```
bump2version --new-version <new_version> # or bump2version <major|minor|patch>
git push && git push --tags
```
from os.path import join, dirname
"""
cli.py
======
Contains command line interface
"""
import logging.config
import click
import yaml
import jsonschema
from .daemon import run_daemon
from .config import load_config
from .daemon import run_daemon, init_client
from .config import load_config, validate_config
from .harvester import main
from .utils import get_params
def setup_logging(debug=False):
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'brief': {
'format': '%(levelname)s %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG' if debug else 'INFO',
'formatter': 'brief',
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG' if debug else 'INFO',
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {"brief": {"format": "%(levelname)s %(name)s: %(message)s"}},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG" if debug else "INFO",
"formatter": "brief",
}
},
"root": {
"handlers": ["console"],
"level": "DEBUG" if debug else "INFO",
},
}
})
def validate_config(config):
with open(join(dirname(__file__), 'config-schema.yaml')) as f:
schema = yaml.load(f)
jsonschema.validate(config, schema)
)
@click.group()
......@@ -45,36 +40,52 @@ def cli():
pass
@cli.command(help='Run the harvester daemon, attaching to a Redis queue')
@click.option('--config-file', type=click.File('r'))
@click.option('--validate/--no-validate', default=False)
@click.option('--host', type=str)
@click.option('--port', type=int)
@click.option('--listen-queue', type=str)
@click.option('--debug/--no-debug', default=False)
def daemon(config_file=None, validate=False, host=None, port=None, listen_queue=None, debug=False):
@cli.command(help="Run the harvester daemon, attaching to a Redis queue")
@click.option("--host", type=str, required=True)
@click.option("--port", type=int, required=True)
@click.option("--listen-queue", type=str, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
@click.option("--validate/--no-validate", default=False)
@click.option("--debug/--no-debug", default=False)
def daemon(
host: str,
port: int,
listen_queue: str,
config_file: str,
validate: bool = False,
debug: bool = False,
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
run_daemon(config, host, port, listen_queue)
client = init_client(host, port)
run_daemon(config, client, listen_queue)
@cli.command(help='Run a single, one-off harvest')
@click.argument('harvester_name', type=str)
@click.option('--config-file', type=click.File('r'))
@click.option('--validate/--no-validate', default=False)
@click.option('--debug/--no-debug', default=False)
@click.option( '--param', '-p', multiple=True)
def harvest(harvester_name, config_file: str=None, validate: bool=False, debug: bool=False, param: tuple=()):
@cli.command(help="Run a single, one-off harvest")
@click.argument("harvester_name", type=str)
@click.option("--host", type=str, required=True)
@click.option("--port", type=int, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
@click.option("--validate/--no-validate", default=False)
@click.option("--debug/--no-debug", default=False)
def harvest(
harvester_name: str,
host: str,
port: int,
config_file: str,
validate: bool = False,
debug: bool = False,
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
kwargs = get_params(param)
kwargs['name'] = harvester_name
main(config, harvester_name, **kwargs)
client = init_client(host, port)
main(config, harvester_name, client)
if __name__ == '__main__':
if __name__ == "__main__":
cli()
......@@ -2,16 +2,6 @@ $id: https://example.com/address.schema.json
$schema: http://json-schema.org/draft-07/schema#
type: object
properties:
redis:
description: Redis configuration
type: object
properties:
host:
description: Host address for Redis
type: string
port:
description: Port for Redis
type: integer
harvesters:
description: List of harvesters
type: array
......@@ -20,7 +10,7 @@ properties:
type: object
properties:
name:
description: Name of the harvester. Should be unique
description: Name of the harvester. Should be unique
type: string
queue:
description: Name of queue to send queried data to
......@@ -38,7 +28,7 @@ properties:
type:
description: type of the endpoint
type: string
enum:
enum:
- STACAPI
- STACCatalog
- OpenSearch
......@@ -50,11 +40,11 @@ properties:
time_property:
description: what time to extract from queried results.
type: string
enum:
enum:
- sensed
- updated
- created
- modified
- modified
bbox:
description: Bounding box to be queried
type: string
......@@ -69,6 +59,8 @@ properties:
required:
- begin
required:
required:
- queue
- name
- endpoint
- mode
import os
import re
import datetime
from typing import TextIO
import jsonschema
import yaml
ENV_PATTERN = re.compile(r'.*?\${(\w+)}.*?')
ENV_PATTERN = re.compile(r".*?\${(\w+)}.*?")
LOADER = yaml.SafeLoader
def constructor_env_variables(loader, node):
"""
Extracts the environment variable from the node's value
:param yaml.Loader loader: the yaml loader
:param node: the current node in the yaml
:return: the parsed string that contains the value of the environment
variable
"""
value = loader.construct_scalar(node)
match = ENV_PATTERN.findall(value) # to find all env variables in line
if match:
full_value = value
for g in match:
env_variable = os.environ.get(g, )
if env_variable is not None:
full_value = full_value.replace(
f'${{{g}}}', env_variable
)
else:
return None
return full_value
return value
def now(loader, node):
"""
Extracts the environment variable from the node's value
:param yaml.Loader loader: the yaml loader
:param node: the current node in the yaml
:return: the parsed string that contains the value of the environment
variable
"""
value = loader.construct_scalar(node)
match = ENV_PATTERN.findall(value) # to find all env variables in line
if match:
full_value = value
for g in match:
env_variable = os.environ.get(
g,
)
if env_variable is not None:
full_value = full_value.replace(f"${{{g}}}", env_variable)
else:
return None
return full_value
return value
def constructor_now(loader, node):
return datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
tags = {
'!env': constructor_env_variables,
'!now': now,
}
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
LOADER.add_implicit_resolver("!env", ENV_PATTERN, None)
LOADER.add_constructor("!env", constructor_env_variables)
# this tag resolves !now to datetime.now
LOADER.add_constructor("!now", constructor_now)
def load_config(input_file: TextIO) -> dict:
loader = yaml.SafeLoader
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
for tag, func in tags.items():
if tag == '!env':
loader.add_implicit_resolver(tag, ENV_PATTERN, None)
loader.add_constructor(tag, func)
return yaml.load(input_file, Loader=loader)
def load_config(input_file: str) -> dict:
return yaml.load(input_file, Loader=LOADER)
def validate_config(config):
with open(
os.path.join(os.path.dirname(__file__), "config-schema.yaml"), encoding="utf-8"
) as file:
schema = yaml.load(file, LOADER)
jsonschema.validate(config, schema)
"""
daemon.py
==========
Contains functionality related to running the daemon
"""
import logging
import redis
from redis import Redis
from .harvester import main
logger = logging.getLogger(__name__)
def run_daemon(config: dict, host: str, port: str, listen_queue: str):
""" Run the harvester daemon, listening on a redis queue
for harvest jobs.
def init_client(host: str, port: int) -> Redis:
redis = Redis(host=host, port=port, charset="utf-8", decode_responses=True)
return redis
def run_daemon(config: dict, client: Redis, listen_queue: str):
"""Run the harvester daemon, listening on a redis queue
for harvest jobs.
"""
# initialize the queue client
client = redis.Redis(
host=host, port=port, charset="utf-8", decode_responses=True
)
logger.debug("waiting for redis queue '%s'..." % listen_queue)
logger.debug("waiting for redis queue '%s'", listen_queue)
while True:
# fetch an item from the queue to be harvested
_, value = client.brpop(listen_queue) # first param which queue ther result came from
_, value = client.brpop(listen_queue)
# start the harvesting
try:
main(config, value, client)
except Exception as e:
logger.exception(e)
main(config, value, client)
from datetime import datetime
import logging
from typing import Type, List
from pygeofilter.parsers.cql_json import parse as json_parse
from pygeofilter.backends.native.evaluate import NativeEvaluator
from ..query import Query
logger = logging.getLogger(__name__)
class Endpoint:
def __init__(self, url: str, query: dict, filter: dict, *args, **kwargs):
self.url = url
self.query = Query(**query)
self.filter = json_parse(filter)
def harvest(self) -> list:
# All endpoints should extend function to do following
# 1. prepare query
# 2. query endpoint
# 3. convert to stac items
# 4. filter data if necessary
# 5. return list of stac items
raise NotImplementedError()
@classmethod
def from_config(cls, endpoint_config: dict) -> Type['Endpoint']:
subclass_map = {subclass.type: subclass for subclass in cls.__subclasses__()}
endpoint_type = endpoint_config.pop('type', None)
SubClass = subclass_map[endpoint_type]
return SubClass(**endpoint_config)
def filter_data(self, data: List[dict]) -> List[dict]:
attr_map = {
'point_attr': 'geometry',
'*': 'properties.*'
}
e = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
evaluator = e.evaluate(self.filter)
result = list(filter(evaluator, data))
return result
from .Endpoint import Endpoint
class FTPEndpoint(Endpoint):
type = 'FTP'
from .Endpoint import Endpoint
class OADSEndpoint(Endpoint):
type = 'OADS'
from .Endpoint import Endpoint
class OGCAPIEndpoint(Endpoint):
type = 'OGCAPI'
import logging
from typing import List, Optional, Tuple, Type
from dataclasses import dataclass
import requests
import lxml.etree as ET
import pystac
from .Endpoint import Endpoint
from ..stac import STACItemComposer
logger = logging.getLogger(__name__)
SearchPage = Tuple[List[dict], Optional[str]]
@dataclass
class SearchPage:
records: List[dict]
index: int
total: int
class OpenSearchFormat:
mimetype = None
@classmethod
def from_config(cls, config: dict) -> Type['OpenSearchFormat']:
subclass_map = {
subclass.mimetype: subclass for subclass in cls.__subclasses__()
}
type_ = config.pop('type', None)
SubClass = subclass_map[type_]
return SubClass(**config)
class GeoJSONFormat(OpenSearchFormat):
mimetype = 'application/json'
def __init__(self, property_mapping):
self.property_mapping = property_mapping
def parse(self, response: requests.Response) -> SearchPage:
data = response.json()
features = [
self._parse_feature(feature) for feature in data['features']
]
return SearchPage(
features,
data['properties']['startIndex'],
data['properties']['totalResults'],
)
def _parse_feature(self, feature: dict) -> dict:
properties = {
property_name: feature['properties'][property_path]
for property_name, property_path in self.property_mapping.items()
}
return pystac.Item(
feature['id'],
geometry=feature['geometry'],
bbox=None,
datetime=None,
properties=properties,
).to_dict()
class AtomFormat(OpenSearchFormat):
mimetype = 'application/atom+xml'
def __init__(self):
# TODO: maybe add mapping from XML -> properties in output STAC
pass
class OpenSearchEndpoint(Endpoint):
type = 'OpenSearch'
NS = {
'': 'http://www.w3.org/2005/Atom',
'opensearch': 'http://a9.com/-/spec/opensearch/1.1/',
'parameters': 'http://a9.com/-/spec/opensearch/extensions/parameters/1.0/',
'georss': 'http://www.georss.org/georss/',
'media': 'http://search.yahoo.com/mrss/',
'owc': 'http://www.opengis.net/owc/1.0/',
'eo': 'http://a9.com/-/opensearch/extensions/eo/1.0/',
'geo': 'http://a9.com/-/opensearch/extensions/geo/1.0/',
'time': 'http://a9.com/-/opensearch/extensions/time/1.0/',
'cql': 'http://a9.com/-/opensearch/extensions/cql/1.0/',
'dc': 'http://purl.org/dc/elements/1.1/',
}
def __init__(self, format_config, *args, **kwargs):
super(OpenSearchEndpoint, self).__init__(*args, **kwargs)
self.format = OpenSearchFormat.from_config(format_config)
def harvest(self) -> list:
logger.info("Starting OpenSearch harvesting")
# prepare query
parser = ET.XMLParser(recover=True)
data = ET.fromstring(requests.get(self.url).content, parser)
urls = self._get_urls_params(data)
result = []
url, search_params, (index, index_val), _ = self.query.opensearch_params(
urls, self.format.mimetype
)
while True:
response = requests.get(url, params=search_params)
response.raise_for_status()