EOX GitLab Instance

Commit 19acfdf9 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Initial commit with contents of PRISM / VS

parent 6968b6d5
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# venv stuff
bin
include
pyvenv.cfg
#------------------------------------------------------------------------------
#
# Project: prism view server
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
#------------------------------------------------------------------------------
# Copyright (C) 2019 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------
FROM python:3.8
LABEL name="prism view server harvester" \
vendor="EOX IT Services GmbH <https://eox.at>" \
license="MIT Copyright (C) 2021 EOX IT Services GmbH <https://eox.at>" \
type="prism view server harvester"
WORKDIR /opt/harvester
ADD . .
RUN python3 setup.py install
RUN pip3 install boto3 botocore pystac pygeofilter redis
CMD run-harvester.sh
LABEL version="2.0.0-alpha.2"
from .cli import cli
cli(prog_name="harvester")
from os.path import join, dirname
import logging.config
import click
import yaml
import jsonschema
from .daemon import run_daemon
from .config import load_config
from .harvester import main
from .utils import get_params
def setup_logging(debug=False):
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'brief': {
'format': '%(levelname)s %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG' if debug else 'INFO',
'formatter': 'brief',
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG' if debug else 'INFO',
}
})
def validate_config(config):
with open(join(dirname(__file__), 'config-schema.yaml')) as f:
schema = yaml.load(f)
jsonschema.validate(config, schema)
@click.group()
def cli():
pass
@cli.command(help='Run the harvester daemon, attaching to a Redis queue')
@click.option('--config-file', type=click.File('r'))
@click.option('--validate/--no-validate', default=False)
@click.option('--host', type=str)
@click.option('--port', type=int)
@click.option('--listen-queue', type=str)
@click.option('--debug/--no-debug', default=False)
def daemon(config_file=None, validate=False, host=None, port=None, listen_queue=None, debug=False):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
run_daemon(config, host, port, listen_queue)
@cli.command(help='Run a single, one-off harvest')
@click.argument('harvester_name', type=str)
@click.option('--config-file', type=click.File('r'))
@click.option('--validate/--no-validate', default=False)
@click.option('--debug/--no-debug', default=False)
@click.option( '--param', '-p', multiple=True)
def harvest(harvester_name, config_file: str=None, validate: bool=False, debug: bool=False, param: tuple=()):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
kwargs = get_params(param)
kwargs['name'] = harvester_name
main(config, harvester_name, **kwargs)
if __name__ == '__main__':
cli()
$id: https://example.com/address.schema.json
$schema: http://json-schema.org/draft-07/schema#
type: object
properties:
redis:
description: Redis configuration
type: object
properties:
host:
description: Host address for Redis
type: string
port:
description: Port for Redis
type: integer
harvesters:
description: List of harvesters
type: array
items:
description: A harvester configuration
type: object
properties:
name:
description: Name of the harvester. Should be unique
type: string
queue:
description: Name of queue to send queried data to
type: string
endpoint:
description: Endpoint to be queried
type: object
collection:
description: Name of collection.
type: string
properties:
url:
description: url of the collection
type: string
type:
description: type of the endpoint
type: string
enum:
- STACAPI
- STACCatalog
- OpenSearch
- FTP
- S3
- Swift
- OGCAPI
- OADS
time_property:
description: what time to extract from queried results.
type: string
enum:
- sensed
- updated
- created
- modified
bbox:
description: Bounding box to be queried
type: string
time:
description: Sensing times to be used when querying
type: object
properties:
begin:
description: Begining of sensing time
end:
description: End of sensing time
required:
- begin
required:
- queue
- name
import os
import re
import datetime
from typing import TextIO
import yaml
ENV_PATTERN = re.compile(r'.*?\${(\w+)}.*?')
def constructor_env_variables(loader, node):
"""
Extracts the environment variable from the node's value
:param yaml.Loader loader: the yaml loader
:param node: the current node in the yaml
:return: the parsed string that contains the value of the environment
variable
"""
value = loader.construct_scalar(node)
match = ENV_PATTERN.findall(value) # to find all env variables in line
if match:
full_value = value
for g in match:
env_variable = os.environ.get(g, )
if env_variable is not None:
full_value = full_value.replace(
f'${{{g}}}', env_variable
)
else:
return None
return full_value
return value
def now(loader, node):
return datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
tags = {
'!env': constructor_env_variables,
'!now': now,
}
def load_config(input_file: TextIO) -> dict:
loader = yaml.SafeLoader
# the tag will be used to mark where to start searching for the pattern
# e.g. somekey: !env somestring${MYENVVAR}blah blah blah
for tag, func in tags.items():
if tag == '!env':
loader.add_implicit_resolver(tag, ENV_PATTERN, None)
loader.add_constructor(tag, func)
return yaml.load(input_file, Loader=loader)
import logging
import redis
from .harvester import main
logger = logging.getLogger(__name__)
def run_daemon(config: dict, host: str, port: str, listen_queue: str):
""" Run the harvester daemon, listening on a redis queue
for harvest jobs.
"""
# initialize the queue client
client = redis.Redis(
host=host, port=port, charset="utf-8", decode_responses=True
)
logger.debug("waiting for redis queue '%s'..." % listen_queue)
while True:
# fetch an item from the queue to be harvested
_, value = client.brpop(listen_queue) # first param which queue ther result came from
# start the harvesting
try:
main(config, value, client)
except Exception as e:
logger.exception(e)
from datetime import datetime
import logging
from typing import Type, List
from pygeofilter.parsers.cql_json import parse as json_parse
from pygeofilter.backends.native.evaluate import NativeEvaluator
from ..query import Query
logger = logging.getLogger(__name__)
class Endpoint:
def __init__(self, url: str, query: dict, filter: dict, *args, **kwargs):
self.url = url
self.query = Query(**query)
self.filter = json_parse(filter)
def harvest(self) -> list:
# All endpoints should extend function to do following
# 1. prepare query
# 2. query endpoint
# 3. convert to stac items
# 4. filter data if necessary
# 5. return list of stac items
raise NotImplementedError()
@classmethod
def from_config(cls, endpoint_config: dict) -> Type['Endpoint']:
subclass_map = {subclass.type: subclass for subclass in cls.__subclasses__()}
endpoint_type = endpoint_config.pop('type', None)
SubClass = subclass_map[endpoint_type]
return SubClass(**endpoint_config)
def filter_data(self, data: List[dict]) -> List[dict]:
attr_map = {
'point_attr': 'geometry',
'*': 'properties.*'
}
e = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
evaluator = e.evaluate(self.filter)
result = list(filter(evaluator, data))
return result
from .Endpoint import Endpoint
class FTPEndpoint(Endpoint):
type = 'FTP'
from .Endpoint import Endpoint
class OADSEndpoint(Endpoint):
type = 'OADS'
from .Endpoint import Endpoint
class OGCAPIEndpoint(Endpoint):
type = 'OGCAPI'
import logging
from typing import List, Optional, Tuple, Type
from dataclasses import dataclass
import requests
import lxml.etree as ET
import pystac
from .Endpoint import Endpoint
from ..stac import STACItemComposer
logger = logging.getLogger(__name__)
SearchPage = Tuple[List[dict], Optional[str]]
@dataclass
class SearchPage:
records: List[dict]
index: int
total: int
class OpenSearchFormat:
mimetype = None
@classmethod
def from_config(cls, config: dict) -> Type['OpenSearchFormat']:
subclass_map = {
subclass.mimetype: subclass for subclass in cls.__subclasses__()
}
type_ = config.pop('type', None)
SubClass = subclass_map[type_]
return SubClass(**config)
class GeoJSONFormat(OpenSearchFormat):
mimetype = 'application/json'
def __init__(self, property_mapping):
self.property_mapping = property_mapping
def parse(self, response: requests.Response) -> SearchPage:
data = response.json()
features = [
self._parse_feature(feature) for feature in data['features']
]
return SearchPage(
features,
data['properties']['startIndex'],
data['properties']['totalResults'],
)
def _parse_feature(self, feature: dict) -> dict:
properties = {
property_name: feature['properties'][property_path]
for property_name, property_path in self.property_mapping.items()
}
return pystac.Item(
feature['id'],
geometry=feature['geometry'],
bbox=None,
datetime=None,
properties=properties,
).to_dict()
class AtomFormat(OpenSearchFormat):
mimetype = 'application/atom+xml'
def __init__(self):
# TODO: maybe add mapping from XML -> properties in output STAC
pass
class OpenSearchEndpoint(Endpoint):
type = 'OpenSearch'
NS = {
'': 'http://www.w3.org/2005/Atom',
'opensearch': 'http://a9.com/-/spec/opensearch/1.1/',
'parameters': 'http://a9.com/-/spec/opensearch/extensions/parameters/1.0/',
'georss': 'http://www.georss.org/georss/',
'media': 'http://search.yahoo.com/mrss/',
'owc': 'http://www.opengis.net/owc/1.0/',
'eo': 'http://a9.com/-/opensearch/extensions/eo/1.0/',
'geo': 'http://a9.com/-/opensearch/extensions/geo/1.0/',
'time': 'http://a9.com/-/opensearch/extensions/time/1.0/',
'cql': 'http://a9.com/-/opensearch/extensions/cql/1.0/',
'dc': 'http://purl.org/dc/elements/1.1/',
}
def __init__(self, format_config, *args, **kwargs):
super(OpenSearchEndpoint, self).__init__(*args, **kwargs)
self.format = OpenSearchFormat.from_config(format_config)
def harvest(self) -> list:
logger.info("Starting OpenSearch harvesting")
# prepare query
parser = ET.XMLParser(recover=True)
data = ET.fromstring(requests.get(self.url).content, parser)
urls = self._get_urls_params(data)
result = []
url, search_params, (index, index_val), _ = self.query.opensearch_params(
urls, self.format.mimetype
)
while True:
response = requests.get(url, params=search_params)
response.raise_for_status()
search_params[index] += index_val
page = self.format.parse(response)
if page.records:
result.extend(page.records)
else:
break
return result
def _get_urls_params(self, data: ET._Element) -> dict:
urls = {}
for url in data.findall('opensearch:Url', self.NS):
type = url.get('type')
urls[type] = {}