diff --git a/README.md b/README.md index 1f7ada3e887394d841c9b93114f804a9b0829e42..a50a21e7080e757ec3634d84069e5dabaebaef34 100644 --- a/README.md +++ b/README.md @@ -209,11 +209,9 @@ docker stack rm vhr18-pvs # stop stack docker volume rm vhr18-pvs_db-data # delete volumes docker volume rm vhr18-pvs_redis-data docker volume rm vhr18-pvs_traefik-data -docker volume rm vhr18-pvs_cache-db docker volume rm vhr18-pvs_instance-data ``` - ### Setup logging To access the logs, navigate to http://localhost:5601 . Ignore all of the fancy enterprise capabilities and select Kibana > Discover in the hamburger menu. @@ -223,6 +221,7 @@ Since we only have fluentd, you can just use `*` as index pattern. Select `@timestamp` as time field ([see also](https://www.elastic.co/guide/en/kibana/current/tutorial-define-index.html)). + # Documentation ## Installation diff --git a/cache/configure.sh b/cache/configure.sh index d2d3c3ceb60556161c26a26f033e4a59afe2045c..e362050f8d11d2ed6805113b927d90cae05667c6 100755 --- a/cache/configure.sh +++ b/cache/configure.sh @@ -25,8 +25,6 @@ cd - chown -R www-data:www-data "${INSTALL_DIR}" -mkdir -p "/cache-db/${COLLECTION}" - if [ ! -f "${APACHE_CONF}" ] ; then echo "Adding Apache configuration" diff --git a/config/dem_preprocessor-config.yml b/config/dem_preprocessor-config.yml new file mode 100644 index 0000000000000000000000000000000000000000..3ae30fe3a7b0cf59ba735aabe6e05db529ff4fb1 --- /dev/null +++ b/config/dem_preprocessor-config.yml @@ -0,0 +1,55 @@ +source: + type: swift + kwargs: + username: !env '${OS_USERNAME_DOWNLOAD}' + password: !env '${OS_PASSWORD_DOWNLOAD}' + tenant_name: !env '${OS_TENANT_NAME_DOWNLOAD}' + tenant_id: !env '${OS_TENANT_ID_DOWNLOAD}' + region_name: !env '${OS_REGION_NAME_DOWNLOAD}' + auth_url: !env '${OS_AUTH_URL_DOWNLOAD}' + auth_version: !env '${ST_AUTH_VERSION_DOWNLOAD}' + user_domain_name: !env '${OS_USER_DOMAIN_NAME_DOWNLOAD}' +target: + type: swift + replace: false + kwargs: + username: !env '${OS_USERNAME}' + password: !env '${OS_PASSWORD}' + tenant_name: !env '${OS_TENANT_NAME}' + tenant_id: !env '${OS_TENANT_ID}' + region_name: !env '${OS_REGION_NAME}' + auth_version: !env '${ST_AUTH_VERSION}' + auth_url: !env '${OS_AUTH_URL}' + user_domain_name: !env '${OS_USER_DOMAIN_NAME}' + container: !env '${UPLOAD_CONTAINER}' +workdir: /tmp +keep_temp: false +metadata_glob: '*GSC*.xml' +type_extractor: + xpath: + - /gsc:report/gsc:sar_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:productType/text() +level_extractor: + # xpath can also be a list of xpaths to be tried one after another + xpath: substring-after(substring-after(/gsc:report/gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text(), '/'), '/') +preprocessing: + defaults: + move_files: true + data_file_globs: # needs to be taken only from DEM sub-folder, otherwise previews get in + - '**/DEM/*.dt2' + - '**/DEM/*.dt1' + - '**/DEM/*.dt0' + - '**/DEM/*.tif' + output: + options: + format: COG + dstSRS: 'EPSG:4326' + dstNodata: 0 + creationOptions: + - BLOCKSIZE=512 + - COMPRESS=DEFLATE + - NUM_THREADS=8 + - BIGTIFF=IF_SAFER + - OVERVIEWS=AUTO + types: + SAR_DGE_30: # just to pass validation + nested: true diff --git a/config/emg_preprocessor-config.yml b/config/emg_preprocessor-config.yml new file mode 100644 index 0000000000000000000000000000000000000000..dc6fc7b08763e5069c0839ea9409f36c6336c271 --- /dev/null +++ b/config/emg_preprocessor-config.yml @@ -0,0 +1,173 @@ +source: + type: swift + kwargs: + username: !env{{OS_USERNAME_DOWNLOAD}} + password: "!env{{OS_PASSWORD_DOWNLOAD}}" + tenant_name: "!env{{OS_TENANT_NAME_DOWNLOAD}}" + tenant_id: "!env{{OS_TENANT_ID_DOWNLOAD}}" + region_name: "!env{{OS_REGION_NAME_DOWNLOAD}}" + auth_url: "!env{{OS_AUTH_URL_DOWNLOAD}}" + auth_version: "!env{{ST_AUTH_VERSION_DOWNLOAD}}" + user_domain_name: !env{{OS_USER_DOMAIN_NAME_DOWNLOAD}} +# target: +# type: swift +# kwargs: +# auth_version: !env{{ST_AUTH_VERSION}} +# auth_url: "!env{{OS_AUTH_URL}}" +# username: "!env{{OS_USERNAME}}" +# password: "!env{{OS_PASSWORD}}" +# tenant_name: !env{{OS_TENANT_NAME}} +# tenant_id: !env{{OS_TENANT_ID}} +# region_name: !env{{OS_REGION_NAME}} +# user_domain_name: !env{{OS_USER_DOMAIN_NAME}} +target: + type: local + replace: true + kwargs: + storage_path: /mnt/data/target + +workdir: /mnt/data/workdir +keep_temp: true + +# metadata file to look for in downloaded tar/zip file +metadata_glob: "*GSC*.xml" + +# extractors for Product type / level +type_extractor: + # xpath can also be a list of xpaths to be tried one after another + xpath: + - /gsc:report/gsc:opt_metadata/gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text() + - /gsc:report/gsc:sar_metadata/gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text() + map: # optional mapping from extracted type name to used product type name + PHR_FUS__3: PH00 + +level_extractor: + # xpath can also be a list of xpaths to be tried one after another + xpath: substring-after(substring-after(/gsc:report/gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text(), '/'), '/') + map: # optional mapping + + +preprocessing: + defaults: + output: + options: + # WarpOptions (see https://gdal.org/python/osgeo.gdal-module.html#WarpOptions) + format: COG + dstSRS: "EPSG:4326" + dstNodata: 0 + creationOptions: + - BLOCKSIZE=512 + - COMPRESS=DEFLATE + - NUM_THREADS=8 + - BIGTIFF=IF_SAFER + - OVERVIEWS=AUTO + types: + KS03: + nested: true + data_file_globs: + - "*.tif" + additional_file_globs: + - "*.rpc" + georeference: + stack_bands: + # stack all bands for each scene in the product + group_by: ".*/(.*)_P..tif" + sort_by: ".*_P(R|G|B|N).tif" + order: + - R + - G + - B + - N + + + + + RS02: # as extracted/translated above + # whether the package can contain sub-packages of TARs/ZIPs + nested: true + # glob selectors to look for source images in the source package + data_file_globs: + - "*.TIF" + additional_file_globs: + - "*.rpc" + + # a custom preprocessor function to be called on all selected files + # custom_preprocessor: + # path: "path.to.some.module:attribute" + # # TODO: specify args/kwargs and pass meaningful parameters + + georeference: + # georeference each file individually + # - type: geotransform # one of geotransform, RPC, GCP, world file + # - type: GCP + + + stack_bands: + # stack all bands for each scene in the product + group_by: # TODO: figure out a way to get a grouping. e.g: part of the filename using regex? + + output: + + # define a custom postprocessor function to be called on the processed file + # custom_postprocessor: + # path: "path.to.some.module:attribute" + # # TODO: specify args/kwargs and pass meaningful parameters + + CS00: + nested: true + data_file_globs: + - "*.h5" + + subdatasets: + data_file_glob: '*/*/*.h5' + subdataset_types: + '//S01/SBI': 'S01_SBI' + + georeference: + type: corners + corner_names: ["S01_SBI_Bottom_Left_Geodetic_Coordinates", "S01_SBI_Bottom_Right_Geodetic_Coordinates", "S01_SBI_Top_Left_Geodetic_Coordinates", "S01_SBI_Top_Right_Geodetic_Coordinates"] + orbit_direction_name: Orbit_Direction + force_north_up: false + # gcp_srid: + + calc: + formulas: + - inputs: + A: + glob: '*.tif' + band: 1 + B: + glob: '*.tif' + band: 2 + data_type: Float32 + formula: sqrt(A.astype(float)*A.astype(float)+B.astype(float)*B.astype(float)) + output_postfix: _proc + nodata_value: 0 + output: + options: + # WarpOptions (see https://gdal.org/python/osgeo.gdal-module.html#WarpOptions) + format: "COG" + dstSRS: "EPSG:3857" + dstNodata: 0 + creationOptions: + - BLOCKSIZE=512 + - COMPRESS=DEFLATE + - LEVEL=6 + - OVERVIEWS=AUTO + - NUM_THREADS=8 + - BIGTIFF=IF_SAFER + - RESAMPLING=CUBIC + CS01: + nested: true + data_file_globs: + - "*.h5" + + subdatasets: + data_file_glob: '*/*.h5' + subdataset_types: + '//S01/SBI': 'S01_SBI' + + georeference: + type: corners + +# this configuration is still a stub diff --git a/config/vhr18_preprocessor-config.yml b/config/vhr18_preprocessor-config.yml new file mode 100644 index 0000000000000000000000000000000000000000..359c52da54a21f67d2b38ab45d88671dbe404d23 --- /dev/null +++ b/config/vhr18_preprocessor-config.yml @@ -0,0 +1,53 @@ +source: + type: swift + kwargs: + username: !env '${OS_USERNAME_DOWNLOAD}' + password: !env '${OS_PASSWORD_DOWNLOAD}' + tenant_name: !env '${OS_TENANT_NAME_DOWNLOAD}' + tenant_id: !env '${OS_TENANT_ID_DOWNLOAD}' + region_name: !env '${OS_REGION_NAME_DOWNLOAD}' + auth_url: !env '${OS_AUTH_URL_DOWNLOAD}' + auth_version: !env '${ST_AUTH_VERSION_DOWNLOAD}' + user_domain_name: !env '${OS_USER_DOMAIN_NAME_DOWNLOAD}' +target: + type: swift + replace: false + kwargs: + username: !env '${OS_USERNAME}' + password: !env '${OS_PASSWORD}' + tenant_name: !env '${OS_TENANT_NAME}' + tenant_id: !env '${OS_TENANT_ID}' + region_name: !env '${OS_REGION_NAME}' + auth_version: !env '${ST_AUTH_VERSION}' + auth_url: !env '${OS_AUTH_URL}' + user_domain_name: !env '${OS_USER_DOMAIN_NAME}' + container: !env '${UPLOAD_CONTAINER}' +workdir: /tmp +keep_temp: false +metadata_glob: '*GSC*.xml' +type_extractor: + xpath: + - /gsc:report/gsc:opt_metadata/gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text() +level_extractor: + # xpath can also be a list of xpaths to be tried one after another + xpath: substring-after(substring-after(/gsc:report/gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text(), '/'), '/') +preprocessing: + defaults: + move_files: true + data_file_globs: + - '*.tif' + - '*.jp2' + output: + options: + format: COG + dstSRS: 'EPSG:4326' + dstNodata: 0 + creationOptions: + - BLOCKSIZE=512 + - COMPRESS=DEFLATE + - NUM_THREADS=8 + - BIGTIFF=IF_SAFER + - OVERVIEWS=AUTO + types: + PH1B: # just to pass validation + nested: true diff --git a/core/Dockerfile b/core/Dockerfile index 85a0d289043ed7fcbaf7437332fc68598051f381..33baf310685828ebc93ededfc5b9816786491945 100644 --- a/core/Dockerfile +++ b/core/Dockerfile @@ -70,7 +70,8 @@ ENV INSTANCE_ID="prism-view-server_core" \ REDIS_REGISTER_QUEUE_KEY= \ REDIS_REGISTERED_SET_KEY= \ INIT_SCRIPTS="/configure.sh" \ - COLLECT_STATIC="false" + COLLECT_STATIC="false" \ + REGISTRAR_REPLACE= ADD rgbnir_definition.json \ configure.sh \ diff --git a/core/registrar.py b/core/registrar.py index d6e4de5d99be9a291d8d8c81e3763a666f1eef58..7a1f097b0d50096a85fa93dedae9b69021f7cb89 100644 --- a/core/registrar.py +++ b/core/registrar.py @@ -36,6 +36,7 @@ import textwrap import logging import traceback from xml.sax.saxutils import escape +import subprocess import redis import lxml.etree @@ -44,6 +45,7 @@ from swiftclient.service import SwiftService import django from django.db import transaction from django.contrib.gis.geos import GEOSGeometry +from osgeo import gdal path = os.path.join(os.getenv('INSTALL_DIR', "/var/www/pvs"), "pvs_instance") if path not in sys.path: @@ -53,6 +55,7 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "pvs_instance.settings") django.setup() from eoxserver.backends import access +from eoxserver.contrib import vsi from eoxserver.backends import models as backends from eoxserver.core.util.timetools import isoformat from eoxserver.resources.coverages import models @@ -65,7 +68,6 @@ from eoxserver.resources.coverages.registration.registrators.gdal import ( logger = logging.getLogger(__name__) - def setup_logging(verbosity): # start logging setup # get command line level @@ -89,6 +91,16 @@ def setup_logging(verbosity): # finished logging setup +def set_gdal_swift_auth(): + # parsing command line output of swift auth + auth_keys = subprocess.check_output(["swift", "auth"]).decode(sys.stdout.encoding).split("\n") + storage_url = auth_keys[0].split("OS_STORAGE_URL=")[1] + auth_token = auth_keys[1].split("OS_AUTH_TOKEN=")[1] + # setting gdal config + gdal.SetConfigOption("SWIFT_STORAGE_URL", storage_url) + gdal.SetConfigOption("SWIFT_AUTH_TOKEN", auth_token) + + def add_mask(product): metadata_item = product.metadata_items.all()[0] with access.vsi_open(metadata_item) as f: @@ -100,6 +112,7 @@ def add_mask(product): )[0] geometry = GEOSGeometry(wkt) mask_type = models.MaskType.objects.get(product_type=product.product_type) + logger.debug("Adding mask") models.Mask.objects.create( product=product, mask_type=mask_type, @@ -117,7 +130,7 @@ def get_product_type_and_level(metadata_item): xp = '//gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text()' product_type_name = tree.xpath(xp, namespaces=root.nsmap)[0] except Exception as e: - logger.warning( + logger.debug( 'Failed to determine product type of %s, error was %s' % (metadata_item.location, e) ) @@ -126,7 +139,6 @@ def get_product_type_and_level(metadata_item): xp = '//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()' parent_identifier = tree.xpath(xp, namespaces=root.nsmap)[0] - print("parent identifier --->", parent_identifier) if parent_identifier.endswith('Level_1'): level = 'Level_1' if parent_identifier.endswith('Level_3'): @@ -134,7 +146,7 @@ def get_product_type_and_level(metadata_item): else: raise Exception('Invalid parent identifier type name %s' % parent_identifier) except Exception as e: - logger.warning( + logger.debug( 'Failed to determine product level of %s, error was %s' % (metadata_item.location, e) ) @@ -142,37 +154,76 @@ def get_product_type_and_level(metadata_item): return product_type_name, level +def get_product_collection(metadata_file): + # in case collection needs to be determined from metadata + try: + if metadata_file.startswith("/vsiswift"): + set_gdal_swift_auth() + with vsi.open(metadata_file, "r") as f: + tree = lxml.etree.parse(f) + root = tree.getroot() + xp = '//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()' + product_type_name = tree.xpath(xp, namespaces=root.nsmap) + extracted = product_type_name[0].split('/')[0] + return extracted + except Exception as e: + logger.debug( + 'Failed to determine product collection for metadata file %s, error was %s' + % (metadata_file, e) + ) + + +def get_product_type_from_band_count(product_type_name, file_path): + # get raster band count via gdal + logger.debug("Opening file using GDAL: %s" % file_path) + if file_path.startswith("/vsiswift"): + set_gdal_swift_auth() + src_ds = gdal.Open(file_path) + if src_ds is None: + raise RegistrationError("Band check: failed to open dataset: %s " % file_path) + # try to fetch product model with _bandcount + product_type_name_upd = "%s_%s" % (product_type_name, src_ds.RasterCount) + try: + product_type_model = models.ProductType.objects.get(name=product_type_name_upd) + return product_type_model + except models.ProductType.DoesNotExist: + raise RegistrationError("Product Type: '%s' was not found" % product_type_name_upd) + + class RegistrationError(Exception): pass @transaction.atomic def registrar( +<<<<<<< core/registrar.py collection, objects_prefix, upload_container, replace=False, client=None, registered_set_key=None, reporting_dir=None, service_url=None +======= + collection_stack, + objects_prefix, upload_container=None, replace=False, client=None, registered_set_key=None +>>>>>>> core/registrar.py ): logger.info("Starting registration of product '%s'." % objects_prefix) - metadata_package, data_package, has_vrt = None, None, None - + metadata_package, data_package = None, None + if not upload_container: + # assuming objects_prefix = bucket/itemname + upload_container = objects_prefix.partition("/")[0] + objects_prefix = objects_prefix.partition("/")[2] with SwiftService() as swift: list_parts_gen = swift.list( container=upload_container, options={"prefix": objects_prefix}, ) for page in list_parts_gen: - print(page) if page["success"]: for item in page["listing"]: if item["name"].endswith(".xml"): metadata_package = item["name"] elif item["name"].endswith(".TIF") or \ item["name"].endswith(".tif"): - if has_vrt is not True: - data_package = item["name"] - elif item["name"].endswith(".vrt"): data_package = item["name"] - has_vrt = True elif not item["name"].endswith(".tar"): raise RegistrationError( "Product with objects prefix '%s' has " @@ -190,58 +241,95 @@ def registrar( "Product with objects prefix '%s' has missing content." % objects_prefix ) + logger.debug("Found objects '%s' and '%s'." % (data_package, metadata_package)) storage = backends.Storage.objects.get(name=upload_container) metadata_item = models.MetaDataItem(storage=storage, location=metadata_package) product_type, level = get_product_type_and_level(metadata_item) + if collection_stack == 'DEM': + # special for DEM files, collection name === product_type + gdal_metadata_file_path = "/vsiswift/%s/%s" % (upload_container, metadata_package) + product_type = get_product_collection(gdal_metadata_file_path) + logger.debug("Registering product") + product_type_name = "%s_Product_%s" % (collection_stack, product_type) + + try: + # first find product type by name from path + product_type_model = models.ProductType.objects.get(name=product_type_name) + except models.ProductType.DoesNotExist: + # if not found, maybe there are more product types with _bandcount suffix + gdal_file_path = "/vsiswift/%s/%s" % (upload_container, data_package) + product_type_model = get_product_type_from_band_count(product_type_name, gdal_file_path) + product_type_name = product_type_model.name + coverage_type_names = product_type_model.allowed_coverage_types.all() + if len(coverage_type_names) > 1: + logger.warning("More available 'CoverageType' found, selecting the first one.") + coverage_type_name = coverage_type_names[0].name product, replaced = ProductRegistrator().register( metadata_locations=[[upload_container, metadata_package, ], ], - type_name="%s_Product_%s" % (collection, product_type), + type_name=product_type_name, replace=replace, extended_metadata=True, mask_locations=None, package_path=None, + simplify_footprint_tolerance=0.0001, # ~10meters overrides={}, ) + if product.footprint.empty: + product.delete() + raise RegistrationError("No footprint was extracted. full product: %s" % product) collection = models.Collection.objects.get( - identifier=collection + identifier=collection_stack ) + logger.debug("Inserting product into collection %s" % collection_stack) models.collection_insert_eo_object(collection, product) + if collection_stack == "DEM": + # also insert it to its own collection + collection_own = models.Collection.objects.get( + identifier="%s_%s" % (collection, product_type) + ) + logger.debug("Inserting product to collection %s_%s" % (collection, product_type)) + models.collection_insert_eo_object(collection_own, product) + if level == 'Level_1': collection_level_1 = models.Collection.objects.get( identifier="%s_Level_1" % collection ) + logger.debug("Inserting product to collection %s_Level_1" % collection) models.collection_insert_eo_object(collection_level_1, product) elif level == 'Level_3': collection_level_3 = models.Collection.objects.get( identifier="%s_Level_3" % collection ) + logger.debug("Inserting product to collection %s_Level_3" % collection) models.collection_insert_eo_object(collection_level_3, product) + logger.debug("Registering coverage") report = GDALRegistrator().register( data_locations=[[upload_container, data_package, ], ], metadata_locations=[[upload_container, metadata_package, ], ], - coverage_type_name="RGBNir", + coverage_type_name=coverage_type_name, overrides={ "identifier": "%s__coverage" % product.identifier, "footprint": None, }, replace=replace, ) + logger.debug("Adding coverage to product") models.product_add_coverage(product, report.coverage) try: add_mask(product) except Exception as e: - logger.info("Couldn't add mask.") + logger.debug("Couldn't add mask.") logger.debug(traceback.format_exc()) - logger.warning("%s: %s\n" % (type(e).__name__, str(e))) + logger.debug("%s: %s\n" % (type(e).__name__, str(e))) if client is not None: logger.debug( @@ -388,8 +476,7 @@ if __name__ == "__main__": upload_container = os.environ.get('UPLOAD_CONTAINER') if upload_container is None: - logger.critical("UPLOAD_CONTAINER environment variable not set.") - sys.exit(1) + logger.warn("UPLOAD_CONTAINER environment variable not set. Assuming part of path bucket/item") if arg_values.mode == "standard": registrar( diff --git a/core/run-registrar.sh b/core/run-registrar.sh index 86e1ee4d285500b456b6217f28cd697ff47410ee..0ff25e8ee82e582f73d11fd9fac8b83fcde5c73e 100644 --- a/core/run-registrar.sh +++ b/core/run-registrar.sh @@ -1,6 +1,10 @@ #!/bin/sh echo "Running registrar" >&2 +replace="" +if test "$REGISTRAR_REPLACE" = true; then + replace="--replace" +fi python3 /registrar.py \ --mode redis \ @@ -10,4 +14,5 @@ python3 /registrar.py \ --redis-registered-set-key ${REDIS_REGISTERED_SET_KEY} \ --redis-registered-set-key ${REDIS_REGISTERED_SET_KEY} \ --reporting-dir ${REPORTING_DIR} \ - --service-url ${SERVICE_URL} >&2 + --service-url ${SERVICE_URL} \ + ${replace} >&2 diff --git a/docker-compose.dem.yml b/docker-compose.dem.yml index 309ceac515f716acd0a646ff2539238e839fd642..2016ed58c5a7856819b8c125d7b245b8f789721a 100644 --- a/docker-compose.dem.yml +++ b/docker-compose.dem.yml @@ -28,9 +28,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db - type: volume source: instance-data target: /var/www/pvs @@ -60,9 +57,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db env_file: - env/dem.env - env/dem_db.env @@ -83,9 +77,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db env_file: - env/dem.env - env/dem_obs.env @@ -109,6 +100,9 @@ services: environment: INSTANCE_ID: "prism-view-server_preprocessor" WAIT_SERVICES: "redis:6379" + configs: + - source: preprocessor-config + target: /config.yaml deploy: replicas: 1 networks: @@ -154,6 +148,16 @@ services: image: registry.gitlab.eox.at/esa/prism/vs/pvs_client:latest deploy: replicas: 1 + ingestor: + image: registry.gitlab.eox.at/esa/prism/vs/pvs_ingestor:latest + env_file: + - env/dem_redis.env + environment: + INSTANCE_ID: "prism-view-server_ingestor" + deploy: + replicas: 1 + networks: + - intnet configs: init-db: file: ./config/dem_init-db.sh @@ -165,10 +169,11 @@ configs: file: ./config/dem_index-dev.html client-ops: file: ./config/dem_index-ops.html + preprocessor-config: + file: ./config/dem_preprocessor-config.yml volumes: db-data: redis-data: - cache-db: instance-data: report-data: networks: diff --git a/docker-compose.emg.yml b/docker-compose.emg.yml index 76706c207db8241481742ed6362fbfc7529cc918..1b1c7be8494a19161dfcc57113cebddc05e3d0ee 100644 --- a/docker-compose.emg.yml +++ b/docker-compose.emg.yml @@ -28,9 +28,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db - type: volume source: instance-data target: /var/www/pvs @@ -60,9 +57,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db env_file: - env/emg.env - env/emg_db.env @@ -83,9 +77,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db env_file: - env/emg.env - env/emg_obs.env @@ -119,6 +110,9 @@ services: environment: INSTANCE_ID: "prism-view-server_preprocessor" WAIT_SERVICES: "redis:6379" + configs: + - source: preprocessor-config + target: /config.yaml deploy: replicas: 1 networks: @@ -175,10 +169,11 @@ configs: file: ./config/emg_index-dev.html client-ops: file: ./config/emg_index-ops.html + preprocessor-config: + file: ./config/emg_preprocessor-config.yml volumes: db-data: redis-data: - cache-db: instance-data: report-data: networks: diff --git a/docker-compose.vhr18.yml b/docker-compose.vhr18.yml index 61a98bdb4c83469b4e2d140540e3780dcbe2d4d0..e567e41ab0ad8dfb54ef30be8124bbf4f78cfa90 100644 --- a/docker-compose.vhr18.yml +++ b/docker-compose.vhr18.yml @@ -28,9 +28,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db - type: volume source: instance-data target: /var/www/pvs @@ -60,9 +57,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db env_file: - env/vhr18.env - env/vhr18_db.env @@ -86,9 +80,6 @@ services: volumes: - type: tmpfs target: /tmp - - type: volume - source: cache-db - target: /cache-db env_file: - env/vhr18.env - env/vhr18_obs.env @@ -170,10 +161,6 @@ services: image: registry.gitlab.eox.at/esa/prism/vs/pvs_client:latest deploy: replicas: 1 - ingestor: - image: registry.gitlab.eox.at/esa/prism/vs/pvs_ingestor:latest - deploy: - replicas: 1 configs: init-db: file: ./config/vhr18_init-db.sh @@ -190,7 +177,6 @@ configs: volumes: db-data: redis-data: - cache-db: instance-data: report-data: networks: diff --git a/documentation/operator-guide/configuration.rst b/documentation/operator-guide/configuration.rst index 0fb9b12f11dbf48dba4679564eb6027f7435ea3f..578bd680b47cbe2e61596c6357ca9edbf89c2866 100644 --- a/documentation/operator-guide/configuration.rst +++ b/documentation/operator-guide/configuration.rst @@ -117,8 +117,6 @@ The following ``.env`` files are typically used: django admin user to be used with the admin GUI. * ``<stack-name>_obs.env``: This contains access parameters for the object storage(s). -* ``<stack-name>_preprocessor.env``: Preprocessor related environment - variables * ``<stack-name>_redis.env``: Redis access credentials and queue names @@ -173,6 +171,7 @@ retrieve the original product files: * ``OS_REGION_NAME_DOWNLOAD`` * ``OS_AUTH_URL_DOWNLOAD`` * ``ST_AUTH_VERSION_DOWNLOAD`` +* ``OS_USER_DOMAIN_NAME_DOWNLOAD`` VS Environment Variables ^^^^^^^^^^^^^^^^^^^^^^^^ @@ -293,6 +292,10 @@ metadata_glob This file glob is used to determine the main metadata file to extract the product type from. This file will be searched in the downloaded package. +glob_case + + If all globs will be used in a case-sensitive way. + type_extractor This setting configures how the product type is extracted from the previously diff --git a/env/dem_redis.env b/env/dem_redis.env index 45dcfdf80c5dbf4b1dcdfccaba2671ee2b57d7ef..3eff4afd5a700d498d26fadb791632909d8b5f30 100644 --- a/env/dem_redis.env +++ b/env/dem_redis.env @@ -4,6 +4,7 @@ REDIS_PORT=6379 REDIS_QUEUE_KEY=seed_queue REDIS_PREPROCESS_QUEUE_KEY=preprocess_queue +REDIS_PREPROCESS_MD_QUEUE_KEY=preprocess-md_queue REDIS_REGISTER_QUEUE_KEY=register_queue REDIS_REGISTERED_SET_KEY=registered_set REDIS_SET_KEY=registered_set diff --git a/preprocessor/Dockerfile b/preprocessor/Dockerfile index a081f9a91bae56b53125e698c26177498ab95a15..1d3cb90f96f4273318e6ef21d24ab061decf677a 100644 --- a/preprocessor/Dockerfile +++ b/preprocessor/Dockerfile @@ -25,7 +25,6 @@ # IN THE SOFTWARE. #----------------------------------------------------------------------------- -# FROM osgeo/gdal:ubuntu-full-latest FROM osgeo/gdal:ubuntu-full-3.1.2 MAINTAINER EOX @@ -51,22 +50,26 @@ ENV INSTANCE_ID="prism-data-access-server_preprocessor" \ COLLECTION= \ UPLOAD_CONTAINER= \ ST_AUTH_VERSION=3 \ - OS_AUTH_URL="https://auth.cloud.ovh.net/v3/" \ + OS_AUTH_URL= \ OS_USERNAME= \ OS_PASSWORD= \ OS_TENANT_NAME= \ OS_TENANT_ID= \ OS_REGION_NAME= \ + OS_USER_DOMAIN_NAME= \ OS_AUTH_URL_DOWNLOAD= \ ST_AUTH_VERSION_DOWNLOAD= \ OS_USERNAME_DOWNLOAD= \ OS_PASSWORD_DOWNLOAD= \ OS_TENANT_NAME_DOWNLOAD= \ OS_REGION_NAME_DOWNLOAD= \ + OS_USER_DOMAIN_NAME_DOWNLOAD= \ REDIS_HOST= \ REDIS_PORT= \ REDIS_PREPROCESS_QUEUE_KEY= \ - REDIS_REGISTER_QUEUE_KEY= + REDIS_PREPROCESS_MD_QUEUE_KEY= \ + REDIS_REGISTER_QUEUE_KEY= \ + PREPROCESSOR_DEBUG= ADD run-preprocessor.sh \ entrypoint.sh \ diff --git a/preprocessor/preprocessor/archive.py b/preprocessor/preprocessor/archive.py index 7dedcd5f4a040384a2d224c65a6573c763fed3ff..57ccb973939d2c6799cc387b8759bc43f64372ad 100644 --- a/preprocessor/preprocessor/archive.py +++ b/preprocessor/preprocessor/archive.py @@ -4,17 +4,24 @@ import io from typing import List, Union, BinaryIO import tarfile import zipfile -from fnmatch import fnmatch, fnmatchcase +import logging +from fnmatch import translate +import re +logger = logging.getLogger(__name__) ARCHIVE_EXTENSIONS = ['ZIP', 'zip', 'TAR', 'tar', 'TAR.BZ2', 'tar.bz2', 'TAR.GZ', 'tar.gz'] -def filter_filenames(filenames: List[PathLike], glob: str, case=False) -> List[PathLike]: - cmp = fnmatchcase if case else fnmatch +def filter_filenames(filenames: List[PathLike], glob: str, case: bool=False) -> List[PathLike]: + regex = translate(glob) + if case: + reobj = re.compile(regex) + else: + reobj = re.compile(regex, re.IGNORECASE) return [ filename for filename in filenames - if cmp(filename, glob) + if reobj.match(filename) ] @@ -41,7 +48,7 @@ def open_tarfile(archive_file: Union[PathLike, BinaryIO]) -> tarfile.TarFile: return tarfile.open(archive_file) -def unpack_files(archive_path: Union[PathLike, BinaryIO], target_dir: PathLike, glob=None, filenames=None, recursive=False) -> List[PathLike]: +def unpack_files(archive_path: Union[PathLike, BinaryIO], target_dir: PathLike, glob=None, case=None, filenames=None, recursive=False) -> List[PathLike]: """ Unpacks the contents of the specified ZIP or TAR archive to the given target directory. Optionally, only a given list of filenames will be extracted. @@ -67,7 +74,7 @@ def unpack_files(archive_path: Union[PathLike, BinaryIO], target_dir: PathLike, # filter the filenames when a glob is passed if glob: - filenames = filter_filenames(filenames, glob) + filenames = filter_filenames(filenames, glob, case) extracted_filenames = [] @@ -113,6 +120,7 @@ def unpack_files(archive_path: Union[PathLike, BinaryIO], target_dir: PathLike, sub_archive_filename, os.path.join(target_dir, sub_archive), glob, + case, filenames, recursive, ) diff --git a/preprocessor/preprocessor/cli.py b/preprocessor/preprocessor/cli.py index acfcb1a62b038c02e04c068dfdf7446de418ab07..9836ffdc5f02928766a66ba7d043c2f5c0b07763 100644 --- a/preprocessor/preprocessor/cli.py +++ b/preprocessor/preprocessor/cli.py @@ -43,7 +43,7 @@ def validate_config(config): @click.group() def cli(): - setup_logging(True) + pass @cli.command(help='Run the preprocess daemon, attaching to a Redis queue') @@ -55,7 +55,9 @@ def cli(): @click.option('--listen-queue', type=str) @click.option('--listen-md-queue', type=str) @click.option('--write-queue', type=str) -def daemon(config_file=None, use_dir=None, validate=False, host=None, port=None, listen_queue=None, listen_md_queue=None, write_queue=None): +@click.option('--debug/--no-debug', default=False) +def daemon(config_file=None, use_dir=None, validate=False, host=None, port=None, listen_queue=None, listen_md_queue=None, write_queue=None, debug=False): + setup_logging(debug) config = load_config(config_file) if validate: validate_config(config) @@ -68,7 +70,9 @@ def daemon(config_file=None, use_dir=None, validate=False, host=None, port=None, @click.option('--use-dir', type=str) # TODO: check dir @click.option('--validate/--no-validate', default=False) @click.option('--browse-report/--no-browse-report', default=False) -def preprocess(file_path, config_file=None, use_dir=None, validate=False, browse_report=False): +@click.option('--debug/--no-debug', default=False) +def preprocess(file_path, config_file=None, use_dir=None, validate=False, browse_report=False, debug=False): + setup_logging(debug) config = load_config(config_file) if validate: validate_config(config) @@ -79,10 +83,9 @@ def preprocess(file_path, config_file=None, use_dir=None, validate=False, browse browse_type = browse_report_data['browse_type'] for browse in browse_report_data['browses']: - preprocess_browse(config, browse_type, browse_report, browse, use_dir) + preprocess_browse(config, browse_type, browse_report_data, browse, use_dir) else: preprocess_file(config, file_path, use_dir) - if __name__ == '__main__': cli() diff --git a/preprocessor/preprocessor/config-schema.yaml b/preprocessor/preprocessor/config-schema.yaml index cff6590e90804ed0e3b4332aacb8a8053beda667..93f3f9b4f972556cf362bdc5eb24a163772d6fe9 100644 --- a/preprocessor/preprocessor/config-schema.yaml +++ b/preprocessor/preprocessor/config-schema.yaml @@ -26,6 +26,10 @@ properties: description: Extra arguments. Use depends on actual implementation. type: object # required: [type] + replace: + description: If set to true, output replaces already existing files on target. If no existing are present, preprocessing does not start. + type: boolean + default: false workdir: description: The local directory, where intermediary files are to be stored. type: string @@ -35,6 +39,9 @@ properties: metadata_glob: description: A file glob to select metadata files from the downloaded archive. type: string + glob_case: + description: If all file globs will use case-sensitive match. + type: boolean type_extractor: description: How the product type is to be extracted from the metadata file. type: object diff --git a/preprocessor/preprocessor/daemon.py b/preprocessor/preprocessor/daemon.py index 4e4490f1759c9d09f2b8274ace682775abec5f93..e44a3c9079f341b59fcf3384be2bd97d7f811ca1 100644 --- a/preprocessor/preprocessor/daemon.py +++ b/preprocessor/preprocessor/daemon.py @@ -20,16 +20,18 @@ def run_daemon(config, host, port, listen_queue, listen_md_queue, write_queue): logger.debug("waiting for redis queue '%s'..." % listen_queue) while True: # fetch an item from the queue to be preprocessed - value, queue = client.brpop([listen_queue, listen_md_queue]) - + queue, value = client.brpop([listen_queue, listen_md_queue]) + file_paths = [] # start the preprocessing on that file if queue == listen_queue: - filenames = preprocess_file(config, value) + filename, file_path = preprocess_file(config, value) + file_paths.append(file_path) elif queue == listen_md_queue: - browse_report = json.loads(value) - filenames = preprocess_browse(config, browse_report['browse_type'], browse_report, browse) - + browse_report_data = json.loads(value) + browse_type = browse_report_data['browse_type'] + for browse in browse_report_data['browses']: + filename, file_path = preprocess_browse(config, browse_type, browse_report_data, browse) + file_paths.append(file_path) # TODO: convert to string, list, .... - - # write the filenames to the queue - client.lpush(write_queue, filenames) + for item in file_paths: + client.lpush(write_queue, item) diff --git a/preprocessor/preprocessor/preprocess.py b/preprocessor/preprocessor/preprocess.py index ae5a3efb7a8e31e150f94105a207531f1b54a72b..626099a5ab25ec001f907644fb1e856276bde68a 100644 --- a/preprocessor/preprocessor/preprocess.py +++ b/preprocessor/preprocessor/preprocess.py @@ -6,7 +6,6 @@ import logging import shutil from typing import List from pprint import pformat -from time import time from urllib.parse import urlparse from .transfer import get_downloader, get_uploader @@ -16,7 +15,7 @@ from .steps import ( georeference_step, extract_subdataset_step, calc_step, stack_bands_step, output_step ) from .steps.browse_report import browse_georeference -from .util import workdir +from .util import workdir, Timer logging.basicConfig() @@ -25,17 +24,20 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- -def copy_files(source, target): +def copy_files(source, target, move=False): for item in os.listdir(source): src_path = os.path.join(source, item) dst_path = os.path.join(target, item) - if os.path.isdir(src_path): - shutil.copytree( - src_path, - dst_path - ) + if move: + shutil.move(src_path, dst_path) else: - shutil.copy(src_path, dst_path) + if os.path.isdir(src_path): + shutil.copytree( + src_path, + dst_path + ) + else: + shutil.copy(src_path, dst_path) def custom_preprocessor(source_dir, target_dir, path, args=None, kwargs=None): @@ -69,25 +71,6 @@ def flatten(l): return [item for sublist in l for item in sublist] -class Timer: - """ Helper timer class to allow logging of timing values - """ - def __init__(self): - self.start = None - self.end = None - - def __enter__(self): - self.start = time() - return self - - def __exit__(self, *args, **kwargs): - self.end = time() - - @property - def elapsed(self): - return (self.end if self.end is not None else time()) - self.start - - # ----------------------------------------------------------------------------- @@ -114,7 +97,7 @@ def preprocess_internal(preprocess_config, previous_step='unpack'): preprocessor(previous_step, step, **step_config) logger.info( - 'Finished preprocessing step %s after %f seconds.' + 'Finished preprocessing step %s after %.3f seconds.' % (step, step_timer.elapsed) ) force_refresh = True @@ -124,11 +107,14 @@ def preprocess_internal(preprocess_config, previous_step='unpack'): previous_step = step - if not os.path.isdir('upload'): - os.mkdir('upload') + if not os.path.isdir('upload') or force_refresh: + try: + os.mkdir('upload') + except FileExistsError: + logger.debug('Upload folder already exists.') - # copy files from previous step directory to upload directory - copy_files(previous_step, 'upload') + # copy or move files from previous step directory to upload directory + copy_files(previous_step, 'upload', move=preprocess_config.get('move_files', False)) @@ -138,7 +124,16 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N """ with workdir(config, use_dir) as dirname, Timer() as preprocess_timer: logger.info('Preprocessing %s in %s' % (file_path, dirname)) - + target_config = config['target'] + # check if target.replace is configured and if not, check storage if files there + if not target_config['replace']: + uploader = get_uploader( + target_config['type'], target_config.get('args'), target_config.get('kwargs') + ) + if uploader.product_exists(file_path): + raise Exception('Target.replace configuration is not set to true and objects already exist in target %s.' % file_path) + else: + logger.debug('Product does not yet exist on target') # check if we can reuse a previous download if not os.path.isdir('download'): os.mkdir('download') @@ -153,7 +148,7 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N source_archive_path = downloader.download(file_path, 'download') logger.info( - 'Downloaded file %s in %f seconds' + 'Downloaded file %s in %.3f seconds' % (file_path, download_timer.elapsed) ) @@ -162,11 +157,11 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N logger.info('Download dir already exists, skipping...') # fetch the metadata XML file from the downloaded archive - metadata_files = unpack_files(source_archive_path, 'extra', glob=config['metadata_glob']) + metadata_files = unpack_files(source_archive_path, 'extra', glob=config['metadata_glob'], case=config.get('glob_case', False)) # open the XML to retrieve the product type and level product_type, product_level = extract_product_type_and_level(metadata_files, config) - logger.info('Detected %s/%s' % (product_type, product_level)) + logger.info('Detected product_type/level_type %s/%s' % (product_type, product_level)) # get a concrete configuration for the type, filled with the defaults default_config = dict(config['preprocessing'].get('defaults', {})) @@ -186,6 +181,7 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N source_archive_path, 'unpack', glob=glob, + case=config.get('glob_case', False), recursive=preprocess_config.get('nested', False), ) for glob in preprocess_config['data_file_globs'] @@ -195,13 +191,14 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N source_archive_path, 'unpack', glob=glob, + case=config.get('glob_case', False), recursive=preprocess_config.get('nested', False), ) for glob in preprocess_config.get('additional_file_globs', []) ]) logger.info( - 'Unpacked files: %s in %f seconds' + 'Unpacked files: %s in %.3f seconds' % (', '.join(metadata_files + data_files), unpack_timer.elapsed) ) else: @@ -211,16 +208,21 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N preprocess_internal(preprocess_config, 'unpack') # get an uploader for the finalized images - target_config = config['target'] uploader = get_uploader( target_config['type'], target_config.get('args'), target_config.get('kwargs') ) - - upload_filenames = [ - os.path.join(dirpath, filename) - for dirpath, _, filenames in os.walk('upload') - for filename in filenames - ] + if len(os.listdir('upload')) == 0: + # end here, so not only metadata file is uploaded + raise Exception('No data files to upload, aborting.') + + paths_for_upload = ['upload', 'extra'] + upload_filenames = [] + for path_to_upload in paths_for_upload: + upload_filenames.extend([ + os.path.join(dirpath, filename) + for dirpath, _, filenames in os.walk(path_to_upload) + for filename in filenames + ]) # send all files in the upload directory to the target storage logger.info( @@ -231,16 +233,16 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N uploader.upload(upload_filenames, file_path) logger.info( - 'Finished uploading after %f seconds.' + 'Finished uploading after %.3f seconds.' % (upload_timer.elapsed) ) logger.info( - 'Finished preprocessing of %s after %f seconds.' + 'Finished preprocessing of %s after %.3f seconds.' % (file_path, preprocess_timer.elapsed) ) - return upload_filenames + return upload_filenames, file_path def preprocess_browse(config: dict, browse_type: str, browse_report: dict, browse: dict, use_dir: os.PathLike=None): @@ -265,7 +267,7 @@ def preprocess_browse(config: dict, browse_type: str, browse_report: dict, brows source_filename_path = downloader.download(filename, 'download') logger.info( - 'Downloaded file %s in %f seconds' + 'Downloaded file %s in %.3f seconds' % (filename, download_timer.elapsed) ) @@ -277,39 +279,42 @@ def preprocess_browse(config: dict, browse_type: str, browse_report: dict, brows # TODO: check if allowed and download from there raise NotImplementedError - os.mkdir('unpack') + if not os.path.isdir('unpack'): + os.mkdir('unpack') + if not os.path.isdir('extra'): + os.mkdir('extra') logger.info('Applying browse georeference to browse %s' % filename) - browse_georeference('download', 'unpack', browse_type, browse) - + browse_georeference('download', 'unpack', 'extra', browse_report, browse) # fetch the product type from the browse_type product_type = config.get('browse_type_mapping', {}).get(browse_type, browse_type) - logger.info('Detected %s' % (product_type)) + logger.info('Detected product_type %s' % (product_type)) # get a concrete configuration for the type, filled with the defaults default_config = dict(config['preprocessing'].get('defaults', {})) - preprocess_config = dict(config['preprocessing']['types'].get(product_type, {})) - default_config.update(preprocess_config) + type_based_config = dict(config['preprocessing']['types'].get(product_type, {})) + default_config.update(type_based_config) + preprocess_config = default_config logger.debug('Using preprocessing config %s' % pformat(preprocess_config)) preprocess_internal(preprocess_config) - # get an uploader for the finalized images target_config = config['target'] uploader = get_uploader( target_config['type'], target_config.get('args'), target_config.get('kwargs') ) + paths_for_upload = ['upload', 'extra'] + upload_filenames = [] + for path_to_upload in paths_for_upload: + upload_filenames.extend([ + os.path.join(dirpath, filename) + for dirpath, _, filenames in os.walk(path_to_upload) + for filename in filenames + ]) - upload_filenames = [ - os.path.join(dirpath, filename) - for dirpath, _, filenames in os.walk('upload') - for filename in filenames - ] - - # TODO: upload location? - file_path = '' + file_path = browse['browse_identifier'] or upload_filenames[0] # send all files in the upload directory to the target storage logger.info( @@ -320,14 +325,13 @@ def preprocess_browse(config: dict, browse_type: str, browse_report: dict, brows uploader.upload(upload_filenames, file_path) logger.info( - 'Finished uploading after %f seconds.' + 'Finished uploading after %.3f seconds.' % (upload_timer.elapsed) ) logger.info( - 'Finished preprocessing of browse "%s" after %f seconds.' + 'Finished preprocessing of browse "%s" after %.3f seconds.' % (filename, preprocess_timer.elapsed) ) - return upload_filenames - + return upload_filenames, file_path diff --git a/preprocessor/preprocessor/steps/browse_report.py b/preprocessor/preprocessor/steps/browse_report.py index ef6a9068b7a8fc604b128cd2f334fcb59738c9fd..bd377b6cc3dde156079952c1288fd1e7da01df86 100644 --- a/preprocessor/preprocessor/steps/browse_report.py +++ b/preprocessor/preprocessor/steps/browse_report.py @@ -6,12 +6,13 @@ from textwrap import dedent from ..util import replace_ext, pairwise, gdal, osr -def browse_georeference(source_dir: os.PathLike, target_dir: os.PathLike, browse_report: dict, browse: dict): +def browse_georeference(source_dir: os.PathLike, target_dir_data: os.PathLike, target_dir_meta: os.PathLike, browse_report: dict, browse: dict): for filename in glob(join(source_dir, '*')): - target_filename = join(target_dir, replace_ext(basename(filename), '.tif')) - apply_browse_report_georeference(filename, target_filename, browse) + target_filename_data = join(target_dir_data, replace_ext(basename(filename), '.tif')) + target_filename_meta = join(target_dir_meta, replace_ext(basename(filename), '.xml')) + apply_browse_report_georeference(filename, target_filename_data, browse) - generate_gsc(filename, replace_ext(target_filename, '.xml'), browse_report, browse) + generate_gsc(filename, target_filename_meta, browse_report, browse) def apply_browse_report_georeference(input_filename: os.PathLike, target_filename: os.PathLike, browse: dict): diff --git a/preprocessor/preprocessor/steps/georeference.py b/preprocessor/preprocessor/steps/georeference.py index 498786093646ef5452320d1c9014454f4c7495c7..129074815f84b8f10720d367d09606a155677251 100644 --- a/preprocessor/preprocessor/steps/georeference.py +++ b/preprocessor/preprocessor/steps/georeference.py @@ -5,13 +5,12 @@ from glob import glob import shutil from typing import List, Tuple -from ..util import gdal, osr +from ..util import gdal, osr, replace_ext logger = logging.getLogger(__name__) - def georeference_step(source_dir: os.PathLike, target_dir: os.PathLike, type: str, **options: dict): type_name = type.lower() if type_name == 'gcp': @@ -24,8 +23,7 @@ def georeference_step(source_dir: os.PathLike, target_dir: os.PathLike, type: st georef_func = corner_georef else: raise Exception('Invalid georeference type %s' % type_name) - - for filename in glob(join(source_dir, '*.tif')): + for filename in [path for path in glob(join(source_dir, '**'), recursive=True) if not os.path.isdir(path)]: target_filename = join(target_dir, basename(filename)) georef_func(filename, target_filename, **options) @@ -35,7 +33,11 @@ def gcp_georef(input_filename: os.PathLike, target_filename: os.PathLike, order: succeded = False # simple case: get the geotransform from some GCPs - ds = gdal.Open(input_filename, gdal.GA_Update) + try: + ds = gdal.Open(input_filename, gdal.GA_Update) + except RuntimeError: + logger.warn('Can not open file by GDAL %s' % (input_filename)) + return if ds.GetGCPCount() <= 4: try: gcps = ds.GetGCPs() @@ -75,13 +77,14 @@ def rpc_georef(input_filename: os.PathLike, target_filename: os.PathLike, rpc_fi filename=input_filename, fileroot=fileroot, extension=extension, ) + rpc_filename = None try: - rpc_filename = glob(rpc_file_glob)[0] + rpc_filename = glob(rpc_file_glob, recursive=True)[0] except IndexError: logger.warn('No RPC filename found with glob %s' % rpc_file_glob) - # rename RPC filename to be compatible with GDAL - shutil.move(rpc_filename, '%s.rpc' % input_filename) + if rpc_filename: + shutil.move(rpc_filename, replace_ext(input_filename, '.rpc')) gdal.Warp( target_filename, diff --git a/preprocessor/preprocessor/steps/output.py b/preprocessor/preprocessor/steps/output.py index 5d093c6f24b4e55c485792f42749335af2d803b6..d90c53435419f999964899080a7fa7fb6f277ef3 100644 --- a/preprocessor/preprocessor/steps/output.py +++ b/preprocessor/preprocessor/steps/output.py @@ -1,9 +1,12 @@ import os from os.path import join, basename -from glob import glob from uuid import uuid4 +from glob import glob from ..util import replace_ext, gdal +import logging + +logger = logging.getLogger(__name__) def output_step(source_dir: os.PathLike, target_dir: os.PathLike, options: dict=None): @@ -16,8 +19,9 @@ def output_step(source_dir: os.PathLike, target_dir: os.PathLike, options: dict= extension = driver.GetMetadata().get('DMD_EXTENSIONS', 'tif').split(' ')[0] # warp each individual file warped_files = [] - for filename in glob(join(source_dir, '*.tif')): + for filename in [path for path in glob(join(source_dir, '**'), recursive=True) if not os.path.isdir(path)]: target_filename = join(target_dir, replace_ext(basename(filename), extension)) + logger.debug('Warping file %s' % filename) gdal.Warp(target_filename, filename, options=gdal.WarpOptions( **options )) @@ -25,6 +29,7 @@ def output_step(source_dir: os.PathLike, target_dir: os.PathLike, options: dict= if len(warped_files) > 1: tmp_filename = join(target_dir, '%s.%s' % (uuid4().hex, extension)) + logger.debug('Warping files %s' % warped_files) gdal.Warp(tmp_filename, warped_files, options=gdal.WarpOptions( **options )) diff --git a/preprocessor/preprocessor/steps/stack.py b/preprocessor/preprocessor/steps/stack.py index 89eb059089dfebc35a5972b7976120fca4837387..809f3ce7e574e31e3788cb3e15cab5fd1c489186 100644 --- a/preprocessor/preprocessor/steps/stack.py +++ b/preprocessor/preprocessor/steps/stack.py @@ -11,7 +11,7 @@ from ..util import replace_ext, gdal def stack_bands_step(source_dir: os.PathLike, target_dir: os.PathLike, group_by: str=None, sort_by: str=None, order: List[str]=None): """ Stack bands of the individual images """ - filenames = glob(join(source_dir, '*.tif'), recursive=True) + filenames = [path for path in glob(join(source_dir, '**'), recursive=True) if not os.path.isdir(path)] # check if we have a group_by regex. If yes, use the first # re-group to group by. # Fallback is basename of file as groupname diff --git a/preprocessor/preprocessor/transfer/abc.py b/preprocessor/preprocessor/transfer/abc.py index 409a2ea39cb145f14fc4b036c03d6d9859ff2496..9cc9818dcd1393b73f2c60a1fe78ec0cce98aad5 100644 --- a/preprocessor/preprocessor/transfer/abc.py +++ b/preprocessor/preprocessor/transfer/abc.py @@ -30,3 +30,7 @@ class Uploader(ABC): @abstractmethod def upload(self, local_path: Union[PathLike, List[PathLike]], remote_dir: PathLike) -> List[PathLike]: pass + + @abstractmethod + def product_exists(self, remote_dir: PathLike) -> bool: + pass diff --git a/preprocessor/preprocessor/transfer/local.py b/preprocessor/preprocessor/transfer/local.py index 2ee3da83f9a9f588bb03281d7b5844d2596f8172..f1450f7d6b18d2e0a98232023d5bda5cb6db667a 100644 --- a/preprocessor/preprocessor/transfer/local.py +++ b/preprocessor/preprocessor/transfer/local.py @@ -39,3 +39,10 @@ class Uploader(Base): shutil.copy2(local_path, remote_path) return remote_paths + + def product_exists(self, remote_dir: os.PathLike) -> bool: + remote_path = os.path.join(self.storage_path, remote_dir) + for r, d, f in os.walk(remote_path): + if len(f) >= 2: + return True + return False diff --git a/preprocessor/preprocessor/transfer/swift.py b/preprocessor/preprocessor/transfer/swift.py index b5178397d73463763c1a8f0b0f4d73894a884fc4..a58f7fb146530860a1b7af61961b38ea6b6849dc 100644 --- a/preprocessor/preprocessor/transfer/swift.py +++ b/preprocessor/preprocessor/transfer/swift.py @@ -6,7 +6,8 @@ import logging from swiftclient.multithreading import OutputManager from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject - +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("swiftclient").setLevel(logging.WARNING) logger = logging.getLogger(__name__) class Base: @@ -38,15 +39,24 @@ class Base: "os_user_domain_name": self.user_domain_name, }) + def validate_container(self, remote_dir): + if self.container: + # container was specified, use it + return self.container, remote_dir + # container needs to be extracted from path + # paths needs to be updated + return remote_dir.partition('/')[0], remote_dir.partition('/')[2] + class Downloader(Base): """ Downloader for OpenStack swift object storages """ def download(self, remote_path: os.PathLike, local_path: os.PathLike) -> os.PathLike: + container, remote_path = self.validate_container(remote_path) target_filename = os.path.join(local_path, os.path.basename(remote_path)) with self.get_service() as swift: results = swift.download( - self.container, + container, [remote_path], options={ 'out_file': target_filename @@ -65,6 +75,7 @@ class Uploader(Base): """ def upload(self, local_path: Union[os.PathLike, List[os.PathLike]], remote_dir: os.PathLike) -> List[os.PathLike]: paths = local_path if isinstance(local_path, List) else [local_path] + container, remote_dir = self.validate_container(remote_dir) remote_paths = [ os.path.join( remote_dir, @@ -89,8 +100,7 @@ class Uploader(Base): options['use_slo'] = True with self.get_service() as swift: - # use container first part of path of container as upload container - container = self.container or paths[0].partition('/')[0] + # use container or first part of path results = swift.upload(container=container, objects=objects, options=options) for result in results: @@ -111,3 +121,15 @@ class Uploader(Base): raise Exception('Failed to upload %s' % result["error"]) return remote_paths + + def product_exists(self, remote_dir: os.PathLike) -> bool: + with self.get_service() as swift: + container, remote_dir = self.validate_container(remote_dir) + list_parts_gen = swift.list( + container=container, options={"prefix": remote_dir}, + ) + for page in list_parts_gen: + if page["success"] and len(page["listing"]) >= 2: + # at least two files present -> pass validation + return True + return False diff --git a/preprocessor/preprocessor/util.py b/preprocessor/preprocessor/util.py index 11bdd5db94adf3f1759ab81624c34f829240e6d6..249176c5643c3f58d5fedbfed94a478992679dee 100644 --- a/preprocessor/preprocessor/util.py +++ b/preprocessor/preprocessor/util.py @@ -2,6 +2,7 @@ import os from os.path import splitext from contextlib import contextmanager from tempfile import TemporaryDirectory, mkdtemp +from time import time try: from osgeo import gdal @@ -46,3 +47,22 @@ def pairwise(col): yield (next(iterator), next(iterator)) except StopIteration: break + + +class Timer: + """ Helper timer class to allow logging of timing values + """ + def __init__(self): + self.start = None + self.end = None + + def __enter__(self): + self.start = time() + return self + + def __exit__(self, *args, **kwargs): + self.end = time() + + @property + def elapsed(self): + return (self.end if self.end is not None else time()) - self.start diff --git a/preprocessor/run-preprocessor.sh b/preprocessor/run-preprocessor.sh index 674cad023e9e61c50f25880bf6e6a2793172e554..70a8aee6572806b30e158b706fc65fbd95ad427c 100644 --- a/preprocessor/run-preprocessor.sh +++ b/preprocessor/run-preprocessor.sh @@ -1,6 +1,10 @@ #!/bin/sh echo "Running preprocessor" +debug="--no-debug" +if test "$PREPROCESSOR_DEBUG" = true; then + debug="--debug" +fi preprocessor daemon \ --config-file /config.yaml \ @@ -8,4 +12,5 @@ preprocessor daemon \ --port ${REDIS_PORT} \ --listen-queue ${REDIS_PREPROCESS_QUEUE_KEY} \ --listen-md-queue ${REDIS_PREPROCESS_MD_QUEUE_KEY} \ - --write-queue ${REDIS_REGISTER_QUEUE_KEY} + --write-queue ${REDIS_REGISTER_QUEUE_KEY} \ + ${debug} \ No newline at end of file