#!/usr/bin/env python # ----------------------------------------------------------------------------- # # Project: preprocessor.py # Authors: Stephan Meissl <stephan.meissl@eox.at> # # ----------------------------------------------------------------------------- # Copyright (c) 2019 EOX IT Services GmbH # # Python script to preprocess product data. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to # deal in the Software without restriction, including without limitation the # rights to use, copy, modify, merge, publish, distribute, sublicense, and/or # sell copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies of this Software or works derived from this Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # ----------------------------------------------------------------------------- import sys import os import os.path from os.path import join, basename, splitext import argparse import textwrap import logging import traceback import redis import tempfile import tarfile import re import subprocess from urllib.parse import urlparse from urllib.request import urlretrieve from datetime import datetime import json from swiftclient.multithreading import OutputManager from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject import transform_chain import gsc_generator # collection: [name] COLLECTION_MAP = { "VHR_IMAGE_2018": ["VHR IMAGE 2018", ], "Emergency": ["Emergency", ], } logger = logging.getLogger(__name__) def setup_logging(verbosity): # start logging setup # get command line level verbosity = verbosity if verbosity == 0: level = logging.CRITICAL elif verbosity == 1: level = logging.ERROR elif verbosity == 2: level = logging.WARNING elif verbosity == 3: level = logging.INFO else: level = logging.DEBUG logger.setLevel(level) sh = logging.StreamHandler() sh.setLevel(level) formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s") sh.setFormatter(formatter) logger.addHandler(sh) # finished logging setup def swift_download_file(swift, container, object_name, local_path): """ Download a single file from a swift object storage """ options = {"out_file": local_path} for download in swift.download(container, [object_name], options): if download["success"]: logger.debug( "'%s' downloaded" % download["object"] ) else: raise Exception('Failed to download object %s' % object_name) def swift_upload_files(local_dir, swift, container, filenames, base_path=None): """ Upload multiple files """ # get a relative path to the local directory for each file # we use this as a base path on the upload swift relpaths = [ os.path.relpath(filename, local_dir) for filename in filenames ] # create upload objects with info from where to where is uploaded, # potentially using segmented files when upload size is bigger than 5 GB objects = [ SwiftUploadObject( filename, os.path.join(base_path, relpath) if base_path else relpath, # check if 5GB swift upload limit is exceeded, if yes, use segmentation { 'segment_size': 1024 * 1024 * 1024, # 1GB segments } if os.stat(filename).st_size > 1024 * 1024 * 1024 * 5 else None ) for filename, relpath in zip(filenames, relpaths) ] # perform the upload succeeded = [] failed = [] for upload in swift.upload(container, objects): if upload["success"]: succeeded.append(upload) if "object" in upload: logger.info( "'%s' successfully uploaded." % upload["object"] ) elif "for_object" in upload: logger.debug( "Successfully uploaded '%s' segment '%s'." % (upload["for_object"], upload["segment_index"]) ) else: failed.append(upload) # cleanup if failed if failed: for upload in succeeded: for _ in swift.delete(container, [upload['object']]): pass raise Exception('Failed to upload objects: %s' % ( ', '.join(upload['object'] for upload in failed) )) class BasePreprocessor: def __init__(self, replace): self.replace = replace def preprocess(self): options = { "os_username": "", # TODO: Use from environment variable "os_password": "", # TODO: Use from environment variable "os_tenant_name": "", # TODO: Use from environment variable "os_tenant_id": "", # TODO: Use from environment variable } with SwiftService(options=options) as input_swift, \ SwiftService() as output_swift, \ tempfile.TemporaryDirectory() as tmpdir, \ OutputManager(): for container, path, extra in self.get_input_container_and_filenames(): if not self.replace and self.check_output_exists(output_swift, container, path): logger.critical( "Aborting, package '%s' already exists at " "target container '%s'." % (path, container) ) return source_file = self.download_source(input_swift, container, path, tmpdir) unpacked_files = self.unpack_source(source_file, tmpdir) processed_files = self.process_image(unpacked_files, extra, tmpdir) self.upload_files(output_swift, container, path, processed_files, tmpdir) def check_output_exists(self, swift, container, path): list_parts_gen = swift.list( container=container, options={"prefix": path}, ) for page in list_parts_gen: if page["success"]: return True return False def get_input_container_and_filenames(self): raise NotImplementedError def download_source(self, swift, container, object_name, tmpdir): # construct local path local_path = os.path.join(tmpdir, os.path.basename(object_name)) swift_download_file(swift, container, object_name, local_path) return local_path def unpack_source(self, downloaded, tmpdir): raise NotImplementedError def cleanup_source(self, filename): os.unlink(filename) def process_image(self, files, extra, tmpdir): raise NotImplementedError def upload_files(self, swift, container, base_path, filenames, tmpdir): swift_upload_files(tmpdir, swift, container, filenames, base_path) class PackagePreprocessor(BasePreprocessor): def __init__(self, tar_object_path, replace=False): super().__init__(replace) self.tar_object_path = tar_object_path def get_input_container_and_filenames(self): container = self.tar_object_path.split("/")[1] package = "/".join(self.tar_object_path.split("/")[2:]) return [(container, package, None)] def unpack_source(self, filename, tmpdir): tf = tarfile.open(filename, mode="r") data_files_members = [ m for m in tf.getmembers() if re.search(r"IMG.+\.(TIF|JP2)", m.name) ] metadata_file_member = next( m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name) ) data_files = [ member.name for member in data_files_members ] metadata_file = metadata_file_member.name members = data_files_members + [metadata_file_member] if not data_files or not metadata_file: logger.error( "Aborting, not all needed files found in package." ) raise Exception() tf.extractall(path=tmpdir, members=members) # cleanup after use to save space tf.close() return data_files + metadata_file def open_source_dataset(self, files, tmpdir): data_files = files[:-1] source_name = os.path.join(tmpdir, data_files[0]) # if there is more than one file, make a VRT to mosaic them if len(data_files) > 1: logger.debug("More files found, creating a VRT") source_name = os.path.join(tmpdir, 'tmp.vrt') subprocess.run( ['gdalbuildvrt', source_name] + [ os.path.join(tmpdir, data_file) for data_file in data_files ], timeout=600, check=True ) # open file using gdal logger.debug("Opening file using GDAL.") return transform_chain.open_gdal_dataset(source_name) def process_image(self, files, extra, tmpdir): data_files, metadata_file = files[:-1], files[-1] # get initial source dataset src_ds = self.open_source_dataset(data_files, tmpdir) # perform transformation steps as necessary logger.debug("Changing geotransform.") src_ds = transform_chain.correct_geo_transform(src_ds) # save file with given options - should use ENV creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"] logger.debug("Saving resulting file.") output_filename = os.path.join(tmpdir, "%s.tmp" % data_files[0]) transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options) src_ds = None os.rename(output_filename, data_files[0]) return (data_files[0], metadata_file) class BrowseReportPreprocessor(BasePreprocessor): def __init__(self, browse_report, replace=False): super().__init__(replace) self.browse_report = browse_report def get_input_container_and_filenames(self): def _get_browse_container_filename(filename, browse): parsed = urlparse(filename) if parsed.scheme: return (None, filename) container = filename.split("/")[1] filename = "/".join(filename.split("/")[2:]) return container, filename, browse return [ _get_browse_container_filename(browse['filename'], browse) for browse in self.browse_report['browses'] ] def download_source(self, swift, container, object_name, tmpdir): local_path = os.path.join(tmpdir, os.path.basename(object_name)) # download source from either swift or HTTP if container: # construct local path swift_download_file(swift, container, object_name, local_path) else: urlretrieve(object_name, local_path) return local_path def unpack_source(self, filename, tmpdir): # should already be a simple file return [filename] def process_image(self, files, browse, tmpdir): data_file = files[0] src_ds = transform_chain.open_gdal_dataset(data_file) # TODO: preprocessing from ngeo # # perform transformation steps as necessary # logger.debug("Changing geotransform.") # src_ds = transform_chain.correct_geo_transform(src_ds) # save file with given options - should use ENV creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"] logger.debug("Saving resulting file.") output_filename = os.path.join(tmpdir, "%s.tmp" % data_file) transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options) src_ds = None os.rename(output_filename, data_file) # generate GSC metadata metadata_file = self.generate_metadata_file(data_file, browse, tmpdir) return (data_file, metadata_file) def generate_metadata_file(self, data_file, browse, tmpdir): now_time = datetime.now().isoformat().rpartition('.')[0] + 'Z' metadata = { 'identifier': browse['browse_identifier'], 'now_time': now_time, 'archiving_time': now_time, 'delivery_time': now_time, 'begin_time': browse['start_time'], 'end_time': browse['end_time'], 'product_type': browse['browse_type'], 'footprint': gsc_generator.get_footprint_from_browse(data_file, browse) } out_filename = join(tmpdir, splitext(basename(browse['filename']))[0] + '.xml') with open(out_filename, 'w') as f: f.write( gsc_generator.generate_gsc_metadata(metadata) ) return out_filename def preprocessor( collection, tar_object_path, replace=False, client=None, register_queue_key=None ): logger.info("Starting preprocessing of '%s'." % (tar_object_path)) try: container = tar_object_path.split("/")[1] package = "/".join(tar_object_path.split("/")[2:]) with SwiftService() as swift, OutputManager(), \ tempfile.TemporaryDirectory() as tmpdirname: if not replace: try: list_parts_gen = swift.list( container=container, options={"prefix": package}, ) for page in list_parts_gen: if page["success"]: logger.critical( "Aborting, package '%s' already exists at " "target container '%s'." % (package, container) ) return 1 except SwiftError as e: logger.debug(traceback.format_exc()) logger.error("%s: %s\n" % (type(e).__name__, str(e))) return 1 tmpfilename = os.path.join(tmpdirname, "tmp.tar") options = { "os_username": "", # TODO: Use from environment variable "os_password": "", # TODO: Use from environment variable "os_tenant_name": "", # TODO: Use from environment variable "os_tenant_id": "", # TODO: Use from environment variable } with SwiftService(options=options) as swift_down: for down_res in swift_down.download( container=container, objects=[package, ], options={"out_file": tmpfilename}, ): if down_res["success"]: logger.debug( "'%s' downloaded" % down_res["object"] ) else: logger.error( "'%s' download failed" % down_res["object"] ) return 1 tf = tarfile.open(tmpfilename, mode="r") data_files_ti = [ m for m in tf.getmembers() if re.search(r"IMG.+\.(TIF|JP2)", m.name) ] metadata_file_ti = next( m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name) ) data_files = [ member.name for member in data_files_ti ] metadata_file = metadata_file_ti.name members = data_files_ti + [metadata_file_ti] if not data_files or not metadata_file: logger.error( "Aborting, not all needed files found in package." ) return 1 tf.extractall(path=tmpdirname, members=members) # cleanup after use to save space tf.close() os.remove(tmpfilename) source_name = os.path.join(tmpdirname, data_files[0]) tmp_name = os.path.join(tmpdirname, "%s.tmp" % data_files[0]) dest_name = os.path.join( package, "%s.tif" % os.path.splitext(data_files[0])[0] ) # if there is more than one file, make a VRT to mosaic them if len(data_files) > 1: logger.debug("More files found, creating a VRT") source_name = os.path.join(tmpdirname, 'tmp.vrt') subprocess.run( ['gdalbuildvrt', source_name] + [ os.path.join(tmpdirname, data_file) for data_file in data_files ], timeout=600, check=True ) # open file using gdal logger.debug("Opening file using GDAL.") dst = transform_chain.open_gdal_dataset(source_name) # perform transformation steps as necessary logger.debug("Changing geotransform.") dst = transform_chain.correct_geo_transform(dst) # save file with given options - should use ENV creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"] logger.debug("Saving resulting file.") transform_chain.write_gdal_dataset(dst, "COG", os.path.join(tmpdirname, "%s.tmp" % data_files[0]), creation_options) dst = None # check if 5GB swift upload limit is exceeded, if yes, use segmentation size = os.stat(os.path.join(tmpdirname, "%s.tmp" % data_files[0])).st_size uploadParams = {} if (size > 1024 * 1024 * 1024 * 5): uploadParams["segment_size"] = 1024 * 1024 * 1024 # 1gb segments objects = [ SwiftUploadObject(tmp_name, object_name=dest_name), SwiftUploadObject( os.path.join(tmpdirname, metadata_file), object_name=os.path.join(package, metadata_file) ) ] for upload in swift.upload( container=container, objects=objects, options=uploadParams, ): if upload["success"]: if "object" in upload: logger.info( "'%s' successfully uploaded." % upload["object"] ) elif "for_object" in upload: logger.debug( "Successfully uploaded '%s' segment '%s'." % (upload["for_object"], upload["segment_index"]) ) else: logger.error( "'%s' upload failed" % upload["error"] ) return 1 if client is not None: logger.debug( "Storing paths in redis queue '%s" % register_queue_key ) client.lpush( register_queue_key, "%s" % tar_object_path ) except Exception as e: logger.debug(traceback.format_exc()) logger.error("%s: %s\n" % (type(e).__name__, str(e))) return 1 logger.info( "Successfully finished preprocessing of '%s'." % (tar_object_path) ) def preprocessor_redis_wrapper( collection, replace=False, host="localhost", port=6379, preprocess_queue_key="preprocess_queue", preprocess_md_queue_key="preprocess-md_queue", register_queue_key="register_queue" ): client = redis.Redis( host=host, port=port, charset="utf-8", decode_responses=True ) while True: logger.debug("waiting for redis queue '%s'..." % preprocess_queue_key) queue, value = client.brpop([preprocess_queue_key, preprocess_md_queue_key]) if queue == preprocess_md_queue_key: preprocessor = BrowseReportPreprocessor(json.loads(value)) else: preprocessor = PackagePreprocessor(value) # preprocessor( # collection, # value[1], # replace=replace, # client=client, # register_queue_key=register_queue_key # ) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.description = textwrap.dedent("""\ Preprocess product data. """) parser.add_argument( "collection", default=None, help=( "Collection the preprocessor is run for." ) ) parser.add_argument( "--mode", default="standard", choices=["standard", "redis"], help=( "The mode to run the preprocessor. Either one-off (standard) or " "reading from a redis queue." ) ) parser.add_argument( "--tar-object-path", default=None, help=( "Path to object holding tar archive file of product." ) ) parser.add_argument( "--replace", action="store_true", help=( "Replace existing products instead of skipping the preprocessing." ) ) parser.add_argument( "--redis-preprocess-queue-key", default="preprocess_queue" ) parser.add_argument( "--redis-register-queue-key", default="register_queue" ) parser.add_argument( "--redis-host", default="localhost" ) parser.add_argument( "--redis-port", type=int, default=6379 ) parser.add_argument( "-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4], help=( "Set verbosity of log output " "(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)" ) ) arg_values = parser.parse_args() setup_logging(arg_values.verbosity) collection = arg_values.collection if collection not in COLLECTION_MAP: logger.critical("Provided collection '%s' is not valid." % collection) sys.exit(1) if arg_values.mode == "standard": preprocessor( collection, arg_values.tar_object_path, replace=arg_values.replace, ) else: preprocessor_redis_wrapper( collection, replace=arg_values.replace, host=arg_values.redis_host, port=arg_values.redis_port, preprocess_queue_key=arg_values.redis_preprocess_queue_key, register_queue_key=arg_values.redis_register_queue_key, )