From 01c45828a0771863d3ebd2de62141c9c6562177b Mon Sep 17 00:00:00 2001 From: Lubomir Bucek <lubomir.bucek@eox.at> Date: Wed, 11 Nov 2020 16:32:38 +0100 Subject: [PATCH] cleanup --- README.md | 2 +- core/registrar.py | 498 ---------------------------------------------- core/setup.py | 2 +- 3 files changed, 2 insertions(+), 500 deletions(-) delete mode 100644 core/registrar.py diff --git a/README.md b/README.md index dd15e52f..abd02d86 100644 --- a/README.md +++ b/README.md @@ -253,7 +253,7 @@ docker stack deploy -c docker-compose.emg.yml -c docker-compose.emg.dev.yml -c d Deploy base & logging stack in production environment: ``` docker stack deploy -c docker-compose.base.ops.yml base-pvs -docker stack deploy -c docker-compose.logging.yml docker-compose.logging.ops.yml logging +docker stack deploy -c docker-compose.logging.yml -c docker-compose.logging.ops.yml logging ``` Deploy the stack in production environment: Please note that in order to reuse existing database volumes, <stack-name> needs to be the same. Here we use `vhr18-pvs` but in operational service `vhr18-pdas` is used. diff --git a/core/registrar.py b/core/registrar.py deleted file mode 100644 index 60b805e3..00000000 --- a/core/registrar.py +++ /dev/null @@ -1,498 +0,0 @@ -#!/usr/bin/env python -# ----------------------------------------------------------------------------- -# -# Project: registrar.py -# Authors: Stephan Meissl <stephan.meissl@eox.at> -# -# ----------------------------------------------------------------------------- -# Copyright (c) 2019 EOX IT Services GmbH -# -# Python script to register products. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to -# deal in the Software without restriction, including without limitation the -# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or -# sell copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies of this Software or works derived from this Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -# ----------------------------------------------------------------------------- - - -import sys -import os -import argparse -import textwrap -import logging -import traceback -from xml.sax.saxutils import escape -import subprocess - -import redis -import lxml.etree -from swiftclient.service import SwiftService - -import django -from django.db import transaction -from django.contrib.gis.geos import GEOSGeometry -from osgeo import gdal - -path = os.path.join(os.getenv('INSTALL_DIR', "/var/www/pvs"), "pvs_instance") -if path not in sys.path: - sys.path.append(path) - -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "pvs_instance.settings") -django.setup() - -from eoxserver.backends import access -from eoxserver.contrib import vsi -from eoxserver.backends import models as backends -from eoxserver.core.util.timetools import isoformat -from eoxserver.resources.coverages import models -from eoxserver.resources.coverages.registration.product import ( - ProductRegistrator -) -from eoxserver.resources.coverages.registration.registrators.gdal import ( - GDALRegistrator -) - -logger = logging.getLogger(__name__) - -def setup_logging(verbosity): - # start logging setup - # get command line level - verbosity = verbosity - if verbosity == 0: - level = logging.CRITICAL - elif verbosity == 1: - level = logging.ERROR - elif verbosity == 2: - level = logging.WARNING - elif verbosity == 3: - level = logging.INFO - else: - level = logging.DEBUG - logger.setLevel(level) - sh = logging.StreamHandler() - sh.setLevel(level) - formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s") - sh.setFormatter(formatter) - logger.addHandler(sh) - # finished logging setup - - -def set_gdal_swift_auth(): - # parsing command line output of swift auth - auth_keys = subprocess.check_output(["swift", "auth"]).decode(sys.stdout.encoding).split("\n") - storage_url = auth_keys[0].split("OS_STORAGE_URL=")[1] - auth_token = auth_keys[1].split("OS_AUTH_TOKEN=")[1] - # setting gdal config - gdal.SetConfigOption("SWIFT_STORAGE_URL", storage_url) - gdal.SetConfigOption("SWIFT_AUTH_TOKEN", auth_token) - - -def add_mask(product): - metadata_item = product.metadata_items.all()[0] - with access.vsi_open(metadata_item) as f: - tree = lxml.etree.parse(f) - root = tree.getroot() - wkt = tree.xpath( - '//gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:vendorSpecific/eop:SpecificInformation[eop:localAttribute/text() = "CF_POLY"]/eop:localValue/text()', - namespaces=root.nsmap - )[0] - geometry = GEOSGeometry(wkt) - mask_type = models.MaskType.objects.get(product_type=product.product_type) - logger.debug("Adding mask") - models.Mask.objects.create( - product=product, - mask_type=mask_type, - geometry=geometry, - ) - - -def get_product_type_and_level(metadata_item): - level = None - with access.vsi_open(metadata_item) as f: - tree = lxml.etree.parse(f) - root = tree.getroot() - - try: - xp = '//gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text()' - product_type_name = tree.xpath(xp, namespaces=root.nsmap)[0] - except Exception as e: - logger.debug( - 'Failed to determine product type of %s, error was %s' - % (metadata_item.location, e) - ) - - try: - xp = '//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()' - parent_identifier = tree.xpath(xp, namespaces=root.nsmap)[0] - - if parent_identifier.endswith('Level_1'): - level = 'Level_1' - if parent_identifier.endswith('Level_3'): - level = 'Level_3' - else: - raise Exception('Invalid parent identifier type name %s' % parent_identifier) - except Exception as e: - logger.debug( - 'Failed to determine product level of %s, error was %s' - % (metadata_item.location, e) - ) - - return product_type_name, level - - -def get_product_collection(metadata_file): - # in case collection needs to be determined from metadata - try: - if metadata_file.startswith("/vsiswift"): - set_gdal_swift_auth() - with vsi.open(metadata_file, "r") as f: - tree = lxml.etree.parse(f) - root = tree.getroot() - xp = '//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()' - product_type_name = tree.xpath(xp, namespaces=root.nsmap) - extracted = product_type_name[0].split('/')[0] - return extracted - except Exception as e: - logger.debug( - 'Failed to determine product collection for metadata file %s, error was %s' - % (metadata_file, e) - ) - - -def get_product_type_from_band_count(product_type_name, file_path): - # get raster band count via gdal - logger.debug("Opening file using GDAL: %s" % file_path) - if file_path.startswith("/vsiswift"): - set_gdal_swift_auth() - src_ds = gdal.Open(file_path) - if src_ds is None: - raise RegistrationError("Band check: failed to open dataset: %s " % file_path) - # try to fetch product model with _bandcount - product_type_name_upd = "%s_%s" % (product_type_name, src_ds.RasterCount) - try: - product_type_model = models.ProductType.objects.get(name=product_type_name_upd) - return product_type_model - except models.ProductType.DoesNotExist: - raise RegistrationError("Product Type: '%s' was not found" % product_type_name_upd) - - -class RegistrationError(Exception): - pass - - -@transaction.atomic -def registrar( - collection_stack, - objects_prefix, upload_container=None, replace=False, client=None, registered_set_key=None, - reporting_dir=None, service_url=None - -): - logger.info("Starting registration of product '%s'." % objects_prefix) - - metadata_package, data_package = None, None - if not upload_container: - # assuming objects_prefix = bucket/itemname - upload_container = objects_prefix.partition("/")[0] - objects_prefix = objects_prefix.partition("/")[2] - with SwiftService() as swift: - list_parts_gen = swift.list( - container=upload_container, options={"prefix": objects_prefix}, - ) - for page in list_parts_gen: - if page["success"]: - for item in page["listing"]: - if item["name"].endswith(".xml"): - metadata_package = item["name"] - elif item["name"].endswith(".TIF") or \ - item["name"].endswith(".tif"): - data_package = item["name"] - elif not item["name"].endswith(".tar"): - raise RegistrationError( - "Product with objects prefix '%s' has " - "wrong content '%s'." - % (objects_prefix, item["name"]) - ) - else: - logger.error(page["error"]) - raise RegistrationError( - "No product found with objects prefix '%s'." - % objects_prefix - ) - - if metadata_package is None or data_package is None: - raise RegistrationError( - "Product with objects prefix '%s' has missing content." - % objects_prefix - ) - logger.debug("Found objects '%s' and '%s'." % (data_package, metadata_package)) - - storage = backends.Storage.objects.get(name=upload_container) - metadata_item = models.MetaDataItem(storage=storage, location=metadata_package) - - product_type, level = get_product_type_and_level(metadata_item) - if collection_stack == 'DEM': - # special for DEM files, collection name === product_type - gdal_metadata_file_path = "/vsiswift/%s/%s" % (upload_container, metadata_package) - product_type = get_product_collection(gdal_metadata_file_path) - logger.debug("Registering product") - product_type_name = "%s_Product_%s" % (collection_stack, product_type) - - try: - # first find product type by name from path - product_type_model = models.ProductType.objects.get(name=product_type_name) - except models.ProductType.DoesNotExist: - # if not found, maybe there are more product types with _bandcount suffix - gdal_file_path = "/vsiswift/%s/%s" % (upload_container, data_package) - product_type_model = get_product_type_from_band_count(product_type_name, gdal_file_path) - product_type_name = product_type_model.name - coverage_type_names = product_type_model.allowed_coverage_types.all() - if len(coverage_type_names) > 1: - logger.warning("More available 'CoverageType' found, selecting the first one.") - coverage_type_name = coverage_type_names[0].name - - product, replaced = ProductRegistrator().register( - metadata_locations=[[upload_container, - metadata_package, ], ], - type_name=product_type_name, - replace=replace, - extended_metadata=True, - mask_locations=None, - package_path=None, - simplify_footprint_tolerance=0.0001, # ~10meters - overrides={}, - ) - if product.footprint.empty: - product.delete() - raise RegistrationError("No footprint was extracted. full product: %s" % product) - - collection = models.Collection.objects.get( - identifier=collection_stack - ) - logger.debug("Inserting product into collection %s" % collection_stack) - models.collection_insert_eo_object(collection, product) - - if collection_stack == "DEM": - # also insert it to its own collection - collection_own = models.Collection.objects.get( - identifier="%s_%s" % (collection, product_type) - ) - logger.debug("Inserting product to collection %s_%s" % (collection, product_type)) - models.collection_insert_eo_object(collection_own, product) - - if level == 'Level_1': - collection_level_1 = models.Collection.objects.get( - identifier="%s_Level_1" % collection - ) - logger.debug("Inserting product to collection %s_Level_1" % collection) - models.collection_insert_eo_object(collection_level_1, product) - elif level == 'Level_3': - collection_level_3 = models.Collection.objects.get( - identifier="%s_Level_3" % collection - ) - logger.debug("Inserting product to collection %s_Level_3" % collection) - models.collection_insert_eo_object(collection_level_3, product) - - logger.debug("Registering coverage") - report = GDALRegistrator().register( - data_locations=[[upload_container, data_package, ], ], - metadata_locations=[[upload_container, - metadata_package, ], ], - coverage_type_name=coverage_type_name, - overrides={ - "identifier": "%s__coverage" % product.identifier, - "footprint": None, - }, - replace=replace, - ) - logger.debug("Adding coverage to product") - models.product_add_coverage(product, report.coverage) - - try: - add_mask(product) - except Exception as e: - logger.debug("Couldn't add mask.") - logger.debug(traceback.format_exc()) - logger.debug("%s: %s\n" % (type(e).__name__, str(e))) - - if client is not None: - logger.debug( - "Storing times in redis queue '%s" % registered_set_key - ) - client.sadd( - registered_set_key, "%s/%s" - % ( - product.begin_time.strftime("%Y%m%dT%H%M%S"), - product.end_time.strftime("%Y%m%dT%H%M%S") - ) - ) - - timestamp = product.inserted.strftime("%Y%m%dT%H%M%S") - - if reporting_dir is not None: - with open(os.path.join(reporting_dir, 'item_%s_%s.xml' % (timestamp, product.identifier)),'w') as f: - f.write(textwrap.dedent(""" - <?xml version="1.0" encoding="UTF-8"?> - <DataAccessItem - xsi:schemaLocation="http://www.telespazio.com/CSCDA/CDD/PDAS PDAS_interfaces%2020190924_1916.xsd" - xmlns="http://www.telespazio.com/CSCDA/CDD/PDAS" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <identifier>{identifier}</identifier> - <BROWSE_AVAILABILITY_DATETIME>{availability_time}</BROWSE_AVAILABILITY_DATETIME> - <URL> - <Service>WCS</Service> - <URL>{wms_capabilities_url}</URL> - </URL> - <URL> - <Service>WMS</Service> - <URL>{wcs_capabilities_url}</URL> - </URL> - </DataAccessItem> - """.format( - identifier=escape(product.identifier), - availability_time=escape(isoformat(product.inserted)), - wcs_capabilities_url=escape( - '%s/ows?service=wcs&request=GetCapabilities&cql=identifier="%s"' - % (service_url, product.identifier) - ), - wms_capabilities_url=escape( - '%s/ows?service=wms&request=GetCapabilities&cql=identifier="%s"' - % (service_url, product.identifier) - ), - ))) - - logger.info( - "Successfully finished registration of product '%s'." % objects_prefix - ) - - -def registrar_redis_wrapper( - collection, - upload_container, - replace=False, host="localhost", port=6379, - register_queue_key="register_queue", - registered_set_key="registered_set", - reporting_dir=None, - service_url=None, -): - client = redis.Redis( - host=host, port=port, charset="utf-8", decode_responses=True - ) - while True: - logger.debug("waiting for redis queue '%s'..." % register_queue_key) - value = client.brpop(register_queue_key) - try: - registrar( - collection, - value[1], - upload_container, - replace=replace, - client=client, - registered_set_key=registered_set_key, - reporting_dir=reporting_dir, - service_url=service_url, - ) - except Exception as e: - logger.debug(traceback.format_exc()) - logger.error("%s: %s\n" % (type(e).__name__, str(e))) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.description = textwrap.dedent("""\ - Register products. - """) - - parser.add_argument( - "--mode", default="standard", choices=["standard", "redis"], - help=( - "The mode to run the registrar. Either one-off (standard) or " - "reading from a redis queue." - ) - ) - parser.add_argument( - "--objects-prefix", default=None, - help=( - "Prefix to objects holding the metadata and data of product." - ) - ) - parser.add_argument( - "--replace", action="store_true", - help=( - "Replace existing products instead of skipping the registration." - ) - ) - parser.add_argument( - "--redis-register-queue-key", default="register_queue" - ) - parser.add_argument( - "--redis-registered-set-key", default="registered_set" - ) - parser.add_argument( - "--redis-host", default="localhost" - ) - parser.add_argument( - "--redis-port", type=int, default=6379 - ) - parser.add_argument( - "--reporting-dir", - ) - parser.add_argument( - "--service-url", - ) - - parser.add_argument( - "-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4], - help=( - "Set verbosity of log output " - "(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)" - ) - ) - - arg_values = parser.parse_args() - - setup_logging(arg_values.verbosity) - - collection = os.environ.get('COLLECTION') - if collection is None: - logger.critical("Collection environment variable not set.") - sys.exit(1) - - upload_container = os.environ.get('UPLOAD_CONTAINER') - if upload_container is None: - logger.warn("UPLOAD_CONTAINER environment variable not set. Assuming part of path bucket/item") - - if arg_values.mode == "standard": - registrar( - collection, - arg_values.objects_prefix, - upload_container, - replace=arg_values.replace, - reporting_dir=arg_values.reporting_dir, - service_url=arg_values.service_url, - ) - else: - registrar_redis_wrapper( - collection, - upload_container, - replace=arg_values.replace, - host=arg_values.redis_host, - port=arg_values.redis_port, - register_queue_key=arg_values.redis_register_queue_key, - registered_set_key=arg_values.redis_registered_set_key, - reporting_dir=arg_values.reporting_dir, - service_url=arg_values.service_url, - ) diff --git a/core/setup.py b/core/setup.py index f64ba39c..b44d89b2 100644 --- a/core/setup.py +++ b/core/setup.py @@ -9,7 +9,7 @@ setup( version="0.0.1", author="", author_email="", - description="preprocessor for PVS", + description="registrar for PVS", long_description=long_description, long_description_content_type="text/markdown", url="https://gitlab.eox.at/esa/prism/vs/-/tree/master/core", -- GitLab