EOX GitLab Instance

Commit c4498947 authored by Mussab Abdalla's avatar Mussab Abdalla
Browse files

initial commit

parent e60b716f
Pipeline #17404 canceled with stages
#------------------------------------------------------------------------------
#
# Project: prism view server
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
#------------------------------------------------------------------------------
# Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------
FROM ubuntu:18.04
LABEL name="prism view server ingestor" \
vendor="EOX IT Services GmbH <https://eox.at>" \
license="MIT Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>" \
type="prism view server ingestor"
USER root
ADD install.sh requirements.txt \
/
RUN /install.sh
RUN mkdir /ingestor
RUN mkdir -p /var/ingestor/fail /var/ingestor/success
COPY app.py config.py filedaemon.py /
COPY ingestor/ /ingestor
ENV INSTANCE_ID="prism-view-server_ingestor" \
REDIS_HOST="redis" \
REDIS_PORT="6379" \
REDIS_PREPROCESS_MD_QUEUE_KEY="preprocess-md_queue" \
INOTIFY_WATCH_DIR="/mnt/data" \
INOTIFY_MASKS="IN_MOVED_TO,IN_CLOSE_WRITE"
CMD ["gunicorn3", "-c", "config.py", "app"]
LABEL version="1.5.2"
# Ingestor
## Set up
```bash
python3 -m venv .
source bin/activate
pip install -r requirements.txt
pip install pytest
```
## Testing
```bash
pytest ingestor/
```
#!/usr/bin/env python
#------------------------------------------------------------------------------
#
# Project: prism view server
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
#------------------------------------------------------------------------------
# Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------
import os
import logging
import logging.config
import json
from flask import Flask, request, Response
import redis
from ingestor.browse_report import parse_browse_report
from ingestor.util import converter, save_endpoint_report, browse_name
application = Flask(__name__)
logger = logging.getLogger(__name__)
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '%(levelname)s: %(message)s',
},
'verbose': {
'format': '[%(asctime)s][%(module)s] %(levelname)s: %(message)s',
}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
}
},
'loggers': {
'': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
}
}
})
client = redis.Redis(
host=os.environ['REDIS_HOST'],
port=int(os.environ.get('REDIS_PORT', '6379')),
charset="utf-8",
decode_responses=True,
)
queue_name = os.environ['REDIS_PREPROCESS_MD_QUEUE_KEY']
@application.route('/', methods=['POST'])
def ingest():
try:
request.get_data()
browse_report = parse_browse_report(request.data)
logger.debug(browse_report)
if isinstance(browse_report, str):
queue_content = browse_report
filename = queue_content
else:
queue_content = json.dumps(
browse_report, default=converter
)
filename = browse_name(browse_report)
client.lpush(queue_name, queue_content)
save_endpoint_report(filename, request.data, True)
return Response(status=202)
except Exception as e:
save_endpoint_report(filename, request.data, False)
return Response(str(e), status=400)
bind = ['0.0.0.0:8000']
#!/usr/bin/env python
# ------------------------------------------------------------------------------
#
# Project: prism view server
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# ------------------------------------------------------------------------------
# Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# -----------------------------------------------------------------------------
import os
import logging
import logging.config
import json
import functools
import operator
import pyinotify
import redis
from ingestor.browse_report import parse_browse_report
from ingestor.util import converter, save_mount_report
logger = logging.getLogger(__name__)
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '%(levelname)s: %(message)s',
},
'verbose': {
'format': '[%(asctime)s][%(module)s] %(levelname)s: %(message)s',
}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
}
},
'loggers': {
'': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
}
}
})
queue_name = os.environ['REDIS_PREPROCESS_MD_QUEUE_KEY']
watch_dir = os.environ['INOTIFY_WATCH_DIR']
inotify_modes = os.environ.get('INOTIFY_MASKS', 'IN_MOVED_TO,IN_CLOSE_WRITE').split(',')
# bitwise or on all elements of configured inotify modes to watch for
inotify_mask = functools.reduce(operator.or_, [getattr(pyinotify, mode, 0) for mode in inotify_modes], 0)
client = redis.Redis(
host=os.environ['REDIS_HOST'],
port=int(os.environ.get('REDIS_PORT', '6379')),
charset='utf-8',
decode_responses=True,
)
watchmanager = pyinotify.WatchManager()
class EventHandler(pyinotify.ProcessEvent):
def handle_file(self, event):
logger.info(f'Parsing browse file: {event.pathname}')
try:
with open(event.pathname) as f:
browse_report = parse_browse_report(f)
logger.debug(browse_report)
if isinstance(browse_report, str):
queue_content = browse_report
else:
queue_content = json.dumps(
browse_report, default=converter
)
client.lpush(queue_name, queue_content)
save_mount_report(event.pathname, True)
except Exception as e:
save_mount_report(event.pathname, False)
logger.exception(e)
def process_IN_CLOSE_WRITE(self, event):
self.handle_file(event)
def process_IN_MOVED_TO(self, event):
self.handle_file(event)
handler = EventHandler()
notifier = pyinotify.Notifier(watchmanager, handler)
watchmanager.add_watch(watch_dir, inotify_mask, rec=True)
notifier.loop()
#------------------------------------------------------------------------------
#
# Project: prism view server
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
#------------------------------------------------------------------------------
# Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------
import io
from lxml import etree
import dateutil.parser
from .util import pairwise
class BrowseReportParserError(ValueError):
pass
NS_REP = 'http://ngeo.eo.esa.int/ngEO/browseReport/1.0'
NS_REP_OLD = 'http://ngeo.eo.esa.int/schema/browseReport'
NS_BSI = 'http://ngeo.eo.esa.int/schema/browse/ingestion'
NS_TPZ = 'http://www.telespazio.com/CSCDA/CDD/PDAS'
def rep(tag):
return f'{{{NS_REP}}}{tag}'
def rep_old(tag):
return f'{{{NS_REP_OLD}}}{tag}'
def bsi(tag):
return f'{{{NS_BSI}}}{tag}'
def tpz(tag):
return f'{{{NS_TPZ}}}{tag}'
ALLOWED_ROOT_TAGS = {rep('browseReport'), rep_old('browseReport'), bsi('ingestBrowse'), tpz('OnLineArchiveItem')}
def parse_browse_report(input_file):
"""
:returns: list of browses
"""
if isinstance(input_file, bytes):
input_file = io.BytesIO(input_file)
try:
tree = etree.parse(input_file)
except etree.XMLSyntaxError as e:
raise BrowseReportParserError('Failed to parse XML.') from e
root = tree.getroot()
if not root.tag in ALLOWED_ROOT_TAGS:
raise BrowseReportParserError(
'Document is not a browse report or an ingest browse instruction.'
)
if root.tag == rep_old('browseReport'):
used_rep = rep_old
elif root.tag == tpz('OnLineArchiveItem'):
if root.findtext(tpz('status')) in ['added', 'updated']:
path_el = root.find(tpz('path')).findtext(tpz('URL'))
obs_path = path_el.replace('\n', '').replace('/', '', 1)
return obs_path
raise BrowseReportParserError('Status not in "added" or "updated", aborting.')
else:
used_rep = rep
return {
'responsible_org_name': root.findtext(used_rep('responsibleOrgName')),
'date_time': dateutil.parser.parse(root.findtext(used_rep('dateTime'))),
'browse_type': root.findtext(used_rep('browseType')),
'browses': [
parse_browse(elem, used_rep)
for elem in root.iterfind(used_rep('browse'))
],
}
def parse_browse(elem, used_rep):
browse = {
'type': '',
'browse_identifier': elem.findtext(used_rep('browseIdentifier')),
'filename': elem.findtext(used_rep('fileName')),
'image_type': elem.findtext(used_rep('imageType')),
'reference_system_identifier': elem.findtext(
used_rep('referenceSystemIdentifier')
),
'start_time': dateutil.parser.parse(elem.findtext(used_rep('startTime'))),
'end_time': dateutil.parser.parse(elem.findtext(used_rep('endTime'))),
}
rectified_elem = elem.find(used_rep('rectifiedBrowse'))
footprint_elem = elem.find(used_rep('footprint'))
geotiff_elem = elem.find(used_rep('modelInGeotiff'))
regular_grid_browse = elem.find(used_rep('regularGrid'))
if rectified_elem is not None:
browse['type'] = 'rectified_browse'
browse['rectified'] = {
'coord_list': [
(float(x), float(y))
for x, y in pairwise(
rectified_elem.findtext(used_rep('coordList')).split()
)
],
}
elif footprint_elem is not None:
browse['type'] = 'footprint_browse'
browse['footprint'] = {
'col_row_list': [
(int(x), int(y))
for x, y in pairwise(
footprint_elem.findtext(used_rep('colRowList')).split()
)
],
'coord_list': [
(float(x), float(y))
for x, y in pairwise(
footprint_elem.findtext(used_rep('coordList')).split()
)
],
}
elif geotiff_elem is not None:
browse['type'] = 'model_in_geotiff_browse'
elif regular_grid_browse is not None:
browse['type'] = 'regular_grid_browse'
browse['regular_grid'] = {
'col_node_number': int(regular_grid_browse.findtext(used_rep('colNodeNumber'))),
'row_node_number': int(regular_grid_browse.findtext(used_rep('rowNodeNumber'))),
'col_step': float(regular_grid_browse.findtext(used_rep('colStep'))),
'row_step': float(regular_grid_browse.findtext(used_rep('rowStep'))),
'coord_lists': [
[
(float(x), float(y))
for x, y in pairwise(elem.text.split())
] for elem in regular_grid_browse.iterfind(used_rep('coordList'))
]
}
return browse
#------------------------------------------------------------------------------
#
# Project: prism view server
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
#------------------------------------------------------------------------------
# Copyright (C) 2020 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------
from os.path import dirname, join
from datetime import datetime
from dateutil import tz
from ingestor.browse_report import parse_browse_report
TEST_DATA_DIR = join(dirname(dirname(__file__)), 'tests/data')
def test_parse_footprint_browse():
with open(join(TEST_DATA_DIR, 'footprint_browse.xml')) as f:
browse_report = parse_browse_report(f)
assert browse_report == {
'responsible_org_name': 'Generated by Eoli 2 ngEO Converter V1.2.0',
'date_time': datetime(2013, 9, 25, 14, 54, 38, 0, tz.UTC),
'browse_type': 'SAR',
'browses': [{
'type': 'footprint_browse',
'browse_identifier': 'ERS-2-11040113373745-1507.SAR_IM0_0P.BP',
'filename': 'ERS-2-11040113373745-1507.SAR_IM0_0P.BP.jpg',
'image_type': 'Jpeg',
'reference_system_identifier': 'EPSG:4326',
'footprint': {
'col_row_list': [
(0, 0),
(500, 0),
(500, 250),
(0, 250),
(0, 0),
],
'coord_list': [
(83.66, 42.31),
(84.53, 42.42),
(84.48, 51.28),
(83.61, 50.32),
(83.66, 42.31),
]
},
'start_time': datetime(2011, 4, 1, 13, 37, 37, 0, tz.UTC),
'end_time': datetime(2011, 4, 1, 13, 37, 52, 0, tz.UTC),
}]
}
def test_parse_model_in_geotiff_browse():
with open(join(TEST_DATA_DIR, 'model_in_geotiff_browse.xml')) as f:
browse_report = parse_browse_report(f)
assert browse_report == {
'responsible_org_name': 'DMI',
'date_time': datetime(2012, 7, 13, 11, 54, 26, 0, tz.UTC),
'browse_type': 'SAR',
'browses': [{
'type': 'model_in_geotiff_browse',
'browse_identifier': 'ID_DODWH_MG2_CORE_09DM010001_1',
'filename': 'ID_DEIMOS01-v2_DE0028bfp_L3R.tif',
'image_type': 'TIFF',
'reference_system_identifier': 'EPSG:4326',
'start_time': datetime(2011, 2, 1, 11, 48, 1, 0, tz.UTC),
'end_time': datetime(2011, 2, 1, 11, 48, 27, 0, tz.UTC),
}]
}
def test_parse_rectified_browse():
with open(join(TEST_DATA_DIR, 'rectified_browse.xml')) as f:
browse_report = parse_browse_report(f)
assert browse_report == {
'responsible_org_name': 'SLAP 03.03',
'date_time': datetime(2014, 7, 24, 11, 58, 24, 0, tz.UTC),
'browse_type': 'NGEO-LIGHT',
'browses': [{
'type': 'rectified_browse',
'browse_identifier': 'LS05_RFUI_TM__GTC_1P_19910928T071939_19910928T072007_040292_0172_0031_B10D',
'filename': 'http://landsat-ds.eo.esa.int/metadata/LandsatTMCloudFreeCoverage/1991/09/28/LS05_RFUI_TM__GTC_1P_19910928T071939_19910928T072007_040292_0172_0031_B10D.BP.PNG',
'image_type': 'PNG',
'reference_system_identifier': 'EPSG:4326',
'rectified': {
'coord_list': [
(40.8395, 40.1005),
(42.6645, 42.7907),
]
},
'start_time': datetime(1991, 9, 28, 7, 19, 39, 0, tz.UTC),
'end_time': datetime(1991, 9, 28, 7, 20, 7, 0, tz.UTC),
}]
}
def test_parse_regular_grid_browse():
with open(join(TEST_DATA_DIR, 'regular_grid_browse.xml')) as f:
browse_report = parse_browse_report(f)
assert browse_report == {
'responsible_org_name': 'Sentinel 1 PDGS',
'date_time': datetime(2012, 11, 8, 17, 25, 49, tzinfo=tz.tzoffset(None, 3600)),
'browse_type': 'SAR',
'browses': [{
'type': 'regular_grid_browse',
'browse_identifier': 'a20120101T043724405923',
'filename': 'quick-look.png',
'image_type': 'PNG',
'reference_system_identifier': 'EPSG:4326',
'regular_grid': {
'col_node_number': 6,
'row_node_number': 20,
'col_step': 78.8,
'row_step': 29.157894737,
'coord_lists': [