EOX GitLab Instance

Commit a408f8db authored by Mussab Abdalla's avatar Mussab Abdalla
Browse files

Merge branch 'dev_tools' into 'main'

added development tools

See merge request !1
parents 1e3c4cd1 7b31290b
Pipeline #19011 passed with stages
in 1 minute and 46 seconds
......@@ -12,7 +12,11 @@ test:
script:
- pip3 install -r requirements.txt
- pip3 install -r requirements-test.txt
- pytest
- pip3 install -r requirements-dev.txt
- flake8
- mypy .
- pytest --cov ingestor --cov-report term-missing
coverage: "/TOTAL.+ ([0-9]{1,3}%)/"
publish_latest:
image: docker:20.10.8
......
#!/usr/bin/env python
#------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
#
# Project: view server
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
#------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
......@@ -24,65 +24,71 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
import os
import logging
import logging.config
import json
from flask import Flask, request, Response
from redis import Redis
from vsq.queue import Queue
from ingestor.browse_report import parse_browse_report
from ingestor.util import converter, save_endpoint_report, browse_name
from ingestor.util import save_endpoint_report, browse_name
application = Flask(__name__)
logger = logging.getLogger(__name__)
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '%(levelname)s: %(message)s',
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"simple": {
"format": "%(levelname)s: %(message)s",
},
"verbose": {
"format": "[%(asctime)s][%(module)s] %(levelname)s: %(message)s",
},
},
"handlers": {
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "verbose",
}
},
"loggers": {
"": {
"handlers": ["console"],
"level": "DEBUG",
"propagate": False,
}
},
'verbose': {
'format': '[%(asctime)s][%(module)s] %(levelname)s: %(message)s',
}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
}
},
'loggers': {
'': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
}
}
})
)
queue_name = os.environ['REDIS_PREPROCESS_MD_QUEUE_KEY']
queue = Queue(queue_name, Redis(
host=os.environ['REDIS_HOST'],
port=int(os.environ.get('REDIS_PORT', '6379')),
charset='utf-8',
decode_responses=True)
queue_name = os.environ["REDIS_PREPROCESS_MD_QUEUE_KEY"]
queue = Queue(
queue_name,
Redis(
host=os.environ["REDIS_HOST"],
port=int(os.environ.get("REDIS_PORT", "6379")),
charset="utf-8",
decode_responses=True,
),
)
@application.route('/', methods=['POST'])
@application.route("/", methods=["POST"])
def ingest():
try:
request.get_data()
if isinstance(request.data, (bytes, bytearray)):
data_from_request = request.data.decode('utf-8')
data_from_request = request.data.decode("utf-8")
elif isinstance(request.data, str):
data_from_request = request.data
else:
......@@ -94,10 +100,12 @@ def ingest():
filename = browse_report
else:
filename = browse_name(browse_report)
task = queue.put({
'type': 'message',
'payload': browse_report,
})
task = queue.put(
{
"type": "message",
"payload": browse_report,
}
)
logger.debug("%s %s %s" % (task.id, task.status, task.created))
save_endpoint_report(filename, data_from_request, True)
......
......@@ -10,15 +10,15 @@ metadata:
name: {{ $fullName }}
labels:
{{- include "vs-ingestor.labels" . | nindent 4 }}
{{- with .Values.global.ingress.annotations }}
{{- with .Values.ingress.annotations | default .Values.global.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
nginx.ingress.kubernetes.io/rewrite-target: /$1
{{- end }}
spec:
{{- if .Values.global.ingress.tls }}
{{- if .Values.ingress.tls | default .Values.global.ingress.tls }}
tls:
{{- range .Values.global.ingress.tls }}
{{- range .Values.ingress.tls | default .Values.global.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
......@@ -27,7 +27,7 @@ spec:
{{- end }}
{{- end }}
rules:
{{- range .Values.global.ingress.hosts }}
{{- range .Values.ingress.hosts | default .Values.global.ingress.hosts }}
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
- host: {{ .host | quote }}
http:
......
......@@ -29,7 +29,6 @@
import os
import logging
import logging.config
import json
import functools
import operator
......@@ -38,66 +37,75 @@ from redis import Redis
from vsq.queue import Queue
from ingestor.browse_report import parse_browse_report
from ingestor.util import converter, save_mount_report
from ingestor.util import save_mount_report
logger = logging.getLogger(__name__)
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '%(levelname)s: %(message)s',
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"simple": {
"format": "%(levelname)s: %(message)s",
},
"verbose": {
"format": "[%(asctime)s][%(module)s] %(levelname)s: %(message)s",
},
},
"handlers": {
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "verbose",
}
},
"loggers": {
"": {
"handlers": ["console"],
"level": "DEBUG",
"propagate": False,
}
},
'verbose': {
'format': '[%(asctime)s][%(module)s] %(levelname)s: %(message)s',
}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
}
},
'loggers': {
'': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
}
}
})
)
queue_name = os.environ['REDIS_PREPROCESS_MD_QUEUE_KEY']
watch_dir = os.environ['INOTIFY_WATCH_DIR']
inotify_modes = os.environ.get('INOTIFY_MASKS', 'IN_MOVED_TO,IN_CLOSE_WRITE').split(',')
queue_name = os.environ["REDIS_PREPROCESS_MD_QUEUE_KEY"]
watch_dir = os.environ["INOTIFY_WATCH_DIR"]
inotify_modes = os.environ.get("INOTIFY_MASKS", "IN_MOVED_TO,IN_CLOSE_WRITE").split(",")
# bitwise or on all elements of configured inotify modes to watch for
inotify_mask = functools.reduce(operator.or_, [getattr(pyinotify, mode, 0) for mode in inotify_modes], 0)
inotify_mask = functools.reduce(
operator.or_, [getattr(pyinotify, mode, 0) for mode in inotify_modes], 0
)
queue = Queue(queue_name, Redis(
host=os.environ['REDIS_HOST'],
port=int(os.environ.get('REDIS_PORT', '6379')),
charset='utf-8',
decode_responses=True)
queue = Queue(
queue_name,
Redis(
host=os.environ["REDIS_HOST"],
port=int(os.environ.get("REDIS_PORT", "6379")),
charset="utf-8",
decode_responses=True,
),
)
watchmanager = pyinotify.WatchManager()
class EventHandler(pyinotify.ProcessEvent):
def handle_file(self, event):
logger.info(f'Parsing browse file: {event.pathname}')
logger.info(f"Parsing browse file: {event.pathname}")
try:
with open(event.pathname) as f:
browse_report = parse_browse_report(f)
logger.debug(browse_report)
task = queue.put({
'type': 'message',
'payload': browse_report,
})
task = queue.put(
{
"type": "message",
"payload": browse_report,
}
)
logger.debug("%s %s %s" % (task.id, task.status, task.created))
save_mount_report(event.pathname, True)
......
#------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
#
# Project: view server
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
#------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
......@@ -23,7 +23,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
import io
......@@ -37,132 +37,138 @@ class BrowseReportParserError(ValueError):
pass
NS_REP = 'http://ngeo.eo.esa.int/ngEO/browseReport/1.0'
NS_REP_OLD = 'http://ngeo.eo.esa.int/schema/browseReport'
NS_BSI = 'http://ngeo.eo.esa.int/schema/browse/ingestion'
NS_TPZ = 'http://www.telespazio.com/CSCDA/CDD/PDAS'
NS_REP = "http://ngeo.eo.esa.int/ngEO/browseReport/1.0"
NS_REP_OLD = "http://ngeo.eo.esa.int/schema/browseReport"
NS_BSI = "http://ngeo.eo.esa.int/schema/browse/ingestion"
NS_TPZ = "http://www.telespazio.com/CSCDA/CDD/PDAS"
def rep(tag):
return f'{{{NS_REP}}}{tag}'
return f"{{{NS_REP}}}{tag}"
def rep_old(tag):
return f'{{{NS_REP_OLD}}}{tag}'
return f"{{{NS_REP_OLD}}}{tag}"
def bsi(tag):
return f'{{{NS_BSI}}}{tag}'
return f"{{{NS_BSI}}}{tag}"
def tpz(tag):
return f'{{{NS_TPZ}}}{tag}'
return f"{{{NS_TPZ}}}{tag}"
ALLOWED_ROOT_TAGS = {rep('browseReport'), rep_old('browseReport'), bsi('ingestBrowse'), tpz('OnLineArchiveItem')}
ALLOWED_ROOT_TAGS = {
rep("browseReport"),
rep_old("browseReport"),
bsi("ingestBrowse"),
tpz("OnLineArchiveItem"),
}
def parse_browse_report(input_file):
"""
:returns: list of browses
:returns: list of browses
"""
if isinstance(input_file, bytes):
input_file = io.BytesIO(input_file)
try:
tree = etree.parse(input_file)
except etree.XMLSyntaxError as e:
raise BrowseReportParserError('Failed to parse XML.') from e
raise BrowseReportParserError("Failed to parse XML.") from e
root = tree.getroot()
if not root.tag in ALLOWED_ROOT_TAGS:
if root.tag not in ALLOWED_ROOT_TAGS:
raise BrowseReportParserError(
'Document is not a browse report or an ingest browse instruction.'
"Document is not a browse report or an ingest browse instruction."
)
if root.tag == rep_old('browseReport'):
if root.tag == rep_old("browseReport"):
used_rep = rep_old
elif root.tag == tpz('OnLineArchiveItem'):
if root.findtext(tpz('status')) in ['added', 'updated']:
path_el = root.find(tpz('path')).findtext(tpz('URL'))
obs_path = path_el.replace('\n', '').replace('/', '', 1)
elif root.tag == tpz("OnLineArchiveItem"):
if root.findtext(tpz("status")) in ["added", "updated"]:
path_el = root.find(tpz("path")).findtext(tpz("URL"))
obs_path = path_el.replace("\n", "").replace("/", "", 1)
return obs_path
raise BrowseReportParserError('Status not in "added" or "updated", aborting.')
else:
used_rep = rep
return {
'responsible_org_name': root.findtext(used_rep('responsibleOrgName')),
'date_time': dateutil.parser.parse(root.findtext(used_rep('dateTime'))),
'browse_type': root.findtext(used_rep('browseType')),
'browses': [
parse_browse(elem, used_rep)
for elem in root.iterfind(used_rep('browse'))
"responsible_org_name": root.findtext(used_rep("responsibleOrgName")),
"date_time": dateutil.parser.parse(root.findtext(used_rep("dateTime"))),
"browse_type": root.findtext(used_rep("browseType")),
"browses": [
parse_browse(elem, used_rep) for elem in root.iterfind(used_rep("browse"))
],
}
def parse_browse(elem, used_rep):
browse = {
'type': '',
'browse_identifier': elem.findtext(used_rep('browseIdentifier')),
'filename': elem.findtext(used_rep('fileName')),
'image_type': elem.findtext(used_rep('imageType')),
'reference_system_identifier': elem.findtext(
used_rep('referenceSystemIdentifier')
"type": "",
"browse_identifier": elem.findtext(used_rep("browseIdentifier")),
"filename": elem.findtext(used_rep("fileName")),
"image_type": elem.findtext(used_rep("imageType")),
"reference_system_identifier": elem.findtext(
used_rep("referenceSystemIdentifier")
),
'start_time': dateutil.parser.parse(elem.findtext(used_rep('startTime'))),
'end_time': dateutil.parser.parse(elem.findtext(used_rep('endTime'))),
"start_time": dateutil.parser.parse(elem.findtext(used_rep("startTime"))),
"end_time": dateutil.parser.parse(elem.findtext(used_rep("endTime"))),
}
rectified_elem = elem.find(used_rep('rectifiedBrowse'))
footprint_elem = elem.find(used_rep('footprint'))
geotiff_elem = elem.find(used_rep('modelInGeotiff'))
regular_grid_browse = elem.find(used_rep('regularGrid'))
rectified_elem = elem.find(used_rep("rectifiedBrowse"))
footprint_elem = elem.find(used_rep("footprint"))
geotiff_elem = elem.find(used_rep("modelInGeotiff"))
regular_grid_browse = elem.find(used_rep("regularGrid"))
if rectified_elem is not None:
browse['type'] = 'rectified_browse'
browse['rectified'] = {
'coord_list': [
browse["type"] = "rectified_browse"
browse["rectified"] = {
"coord_list": [
(float(x), float(y))
for x, y in pairwise(
rectified_elem.findtext(used_rep('coordList')).split()
rectified_elem.findtext(used_rep("coordList")).split()
)
],
}
elif footprint_elem is not None:
browse['type'] = 'footprint_browse'
browse['footprint'] = {
'col_row_list': [
browse["type"] = "footprint_browse"
browse["footprint"] = {
"col_row_list": [
(int(x), int(y))
for x, y in pairwise(
footprint_elem.findtext(used_rep('colRowList')).split()
footprint_elem.findtext(used_rep("colRowList")).split()
)
],
'coord_list': [
"coord_list": [
(float(x), float(y))
for x, y in pairwise(
footprint_elem.findtext(used_rep('coordList')).split()
footprint_elem.findtext(used_rep("coordList")).split()
)
],
}
elif geotiff_elem is not None:
browse['type'] = 'model_in_geotiff_browse'
browse["type"] = "model_in_geotiff_browse"
elif regular_grid_browse is not None:
browse['type'] = 'regular_grid_browse'
browse['regular_grid'] = {
'col_node_number': int(regular_grid_browse.findtext(used_rep('colNodeNumber'))),
'row_node_number': int(regular_grid_browse.findtext(used_rep('rowNodeNumber'))),
'col_step': float(regular_grid_browse.findtext(used_rep('colStep'))),
'row_step': float(regular_grid_browse.findtext(used_rep('rowStep'))),
'coord_lists': [
[
(float(x), float(y))
for x, y in pairwise(elem.text.split())
] for elem in regular_grid_browse.iterfind(used_rep('coordList'))
]
browse["type"] = "regular_grid_browse"
browse["regular_grid"] = {
"col_node_number": int(
regular_grid_browse.findtext(used_rep("colNodeNumber"))
),
"row_node_number": int(
regular_grid_browse.findtext(used_rep("rowNodeNumber"))
),
"col_step": float(regular_grid_browse.findtext(used_rep("colStep"))),
"row_step": float(regular_grid_browse.findtext(used_rep("rowStep"))),
"coord_lists": [
[(float(x), float(y)) for x, y in pairwise(elem.text.split())]
for elem in regular_grid_browse.iterfind(used_rep("coordList"))
],
}
return browse
......@@ -43,9 +43,9 @@ def converter(o):
def save_mount_report(data, success: bool):
if success:
save_dir = os.environ['INGESTOR_SUCCESS_DIR']
save_dir = os.environ["INGESTOR_SUCCESS_DIR"]
else:
save_dir = os.environ['INGESTOR_FAIL_DIR']
save_dir = os.environ["INGESTOR_FAIL_DIR"]
shutil.copy(data, save_dir)
os.remove(data)
......@@ -53,13 +53,13 @@ def save_mount_report(data, success: bool):
def save_endpoint_report(filename: str, data, success: bool):
if success:
save_dir = os.environ['INGESTOR_SUCCESS_DIR']
save_dir = os.environ["INGESTOR_SUCCESS_DIR"]
else:
save_dir = os.environ['INGESTOR_FAIL_DIR']
save_dir = os.environ["INGESTOR_FAIL_DIR"]
with open(os.path.join(save_dir, '%s.xml' % filename), "w") as outfile:
with open(os.path.join(save_dir, "%s.xml" % filename), "w") as outfile:
outfile.write(data)
def browse_name(report):
return '_'.join(browse["browse_identifier"] for browse in report["browses"])
return "_".join(browse["browse_identifier"] for browse in report["browses"])
black==21.10b0
flake8==4.0.1
mypy==0.910
mypy-boto3-s3
types-redis
types-pyyaml
types-python-dateutil
types-requests
[flake8]
max-line-length = 90
exclude = .venv, build, tests
ignore = W503,E203
[mypy]
exclude = (.venv|build|tests)
[mypy-jsonschema.*]
ignore_missing_imports = True
[mypy-setuptools.*]
ignore_missing_imports = True
[mypy-pystac.*]
ignore_missing_imports = True
[mypy-pygeofilter.*]
ignore_missing_imports = True
[mypy-boto3.*]
ignore_missing_imports = True
[mypy-botocore.*]
ignore_missing_imports = True
[mypy-lxml.*]
ignore_missing_imports = True
[mypy-pyinotify.*]
ignore_missing_imports = True
[mypy-vsq.*]
ignore_missing_imports = True
This diff is collapsed.
def test_sample():
assert True
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment