From c0422814f883072ee3cd309f83fb0eaccd2f278d Mon Sep 17 00:00:00 2001 From: Fabian Schindler <fabian.schindler.strauss@gmail.com> Date: Tue, 25 Feb 2020 09:48:10 +0100 Subject: [PATCH] Splitting processor in Package and Browse --- preprocessor/preprocessor.py | 255 ++++++++++++++++++++++++++++++++++- 1 file changed, 248 insertions(+), 7 deletions(-) diff --git a/preprocessor/preprocessor.py b/preprocessor/preprocessor.py index e0c2f83e..9beaa250 100644 --- a/preprocessor/preprocessor.py +++ b/preprocessor/preprocessor.py @@ -31,6 +31,7 @@ import sys import os +import os.path import argparse import textwrap import logging @@ -40,6 +41,8 @@ import tempfile import tarfile import re import subprocess +from urllib.parse import urlparse +from urllib.request import urlretrieve from swiftclient.multithreading import OutputManager from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject @@ -78,6 +81,243 @@ def setup_logging(verbosity): # finished logging setup +def swift_download_file(swift, container, object_name, local_path): + """ Download a single file from a swift object storage + """ + options = {"out_file": local_path} + for download in swift.download(container, [object_name], options): + if download["success"]: + logger.debug( + "'%s' downloaded" % download["object"] + ) + else: + raise Exception('Failed to download object %s' % object_name) + + +def swift_upload_files(local_dir, swift, container, filenames, base_path=None): + """ Upload multiple files + """ + + # get a relative path to the local directory for each file + # we use this as a base path on the upload swift + relpaths = [ + os.path.relpath(filename, local_dir) + for filename in filenames + ] + + # create upload objects with info from where to where is uploaded, + # potentially using segmented files when upload size is bigger than 5 GB + objects = [ + SwiftUploadObject( + filename, + os.path.join(base_path, relpath) if base_path else relpath, + # check if 5GB swift upload limit is exceeded, if yes, use segmentation + { + 'segment_size': 1024 * 1024 * 1024, # 1GB segments + } if os.stat(filename).st_size > 1024 * 1024 * 1024 * 5 else None + ) + for filename, relpath in zip(filenames, relpaths) + ] + + # perform the upload + succeeded = [] + failed = [] + for upload in swift.upload(container, objects): + if upload["success"]: + succeeded.append(upload) + if "object" in upload: + logger.info( + "'%s' successfully uploaded." % upload["object"] + ) + elif "for_object" in upload: + logger.debug( + "Successfully uploaded '%s' segment '%s'." + % (upload["for_object"], upload["segment_index"]) + ) + else: + failed.append(upload) + + # cleanup if failed + if failed: + for upload in succeeded: + for _ in swift.delete(container, [upload['object']]): + pass + + raise Exception('Failed to upload objects: %s' % ( + ', '.join(upload['object'] for upload in failed) + )) + + +class BasePreprocessor: + def __init__(self, replace): + self.replace = replace + + def preprocess(self): + options = { + "os_username": "", # TODO: Use from environment variable + "os_password": "", # TODO: Use from environment variable + "os_tenant_name": "", # TODO: Use from environment variable + "os_tenant_id": "", # TODO: Use from environment variable + } + with SwiftService(options=options) as input_swift, \ + SwiftService() as output_swift, \ + tempfile.TemporaryDirectory() as tmpdir, \ + OutputManager(): + + for container, path in self.get_input_container_and_filenames(): + if not self.replace and self.check_output_exists(output_swift, container, path): + logger.critical( + "Aborting, package '%s' already exists at " + "target container '%s'." % (path, container) + ) + return + + source_file = self.download_source(input_swift, container, path, tmpdir) + unpacked_files = self.unpack_source(source_file, tmpdir) + processed_files = self.process_image(unpacked_files, tmpdir) + self.upload_files(output_swift, container, path, processed_files, tmpdir) + + def check_output_exists(self, swift, container, path): + list_parts_gen = swift.list( + container=container, options={"prefix": path}, + ) + for page in list_parts_gen: + if page["success"]: + return True + return False + + def get_input_container_and_filenames(self): + raise NotImplementedError + + def download_source(self, swift, container, object_name, tmpdir): + # construct local path + local_path = os.path.join(tmpdir, os.path.basename(object_name)) + swift_download_file(swift, container, object_name, local_path) + return local_path + + def unpack_source(self, downloaded, tmpdir): + raise NotImplementedError + + def cleanup_source(self, filename): + os.unlink(filename) + + def process_image(self, files, tmpdir): + raise NotImplementedError + + def upload_files(self, swift, container, base_path, filenames, tmpdir): + swift_upload_files(tmpdir, swift, container, filenames, base_path) + + +class PackagePreprocessor(BasePreprocessor): + def __init__(self, tar_object_path, replace=False): + super().__init__(replace) + self.tar_object_path = tar_object_path + + def get_input_container_and_filenames(self): + container = self.tar_object_path.split("/")[1] + package = "/".join(self.tar_object_path.split("/")[2:]) + return [(container, package)] + + def unpack_source(self, filename, tmpdir): + tf = tarfile.open(filename, mode="r") + + data_files_members = [ + m for m in tf.getmembers() if + re.search(r"IMG.+\.(TIF|JP2)", m.name) + ] + metadata_file_member = next( + m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name) + ) + data_files = [ + member.name + for member in data_files_members + ] + metadata_file = metadata_file_member.name + members = data_files_members + [metadata_file_member] + + if not data_files or not metadata_file: + logger.error( + "Aborting, not all needed files found in package." + ) + raise Exception() + + tf.extractall(path=tmpdir, members=members) + + # cleanup after use to save space + tf.close() + return data_files + metadata_file + + def open_source_dataset(self, files, tmpdir): + data_files = files[:-1] + source_name = os.path.join(tmpdir, data_files[0]) + # if there is more than one file, make a VRT to mosaic them + if len(data_files) > 1: + logger.debug("More files found, creating a VRT") + source_name = os.path.join(tmpdir, 'tmp.vrt') + subprocess.run( + ['gdalbuildvrt', source_name] + [ + os.path.join(tmpdir, data_file) + for data_file in data_files + ], + timeout=600, check=True + ) + + # open file using gdal + logger.debug("Opening file using GDAL.") + return transform_chain.open_gdal_dataset(source_name) + + def process_image(self, files, tmpdir, src_ds): + data_files, metadata_file = files[:-1], files[-1] + + # perform transformation steps as necessary + logger.debug("Changing geotransform.") + src_ds = transform_chain.correct_geo_transform(src_ds) + + # save file with given options - should use ENV + creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"] + logger.debug("Saving resulting file.") + + output_filename = os.path.join(tmpdir, "%s.tmp" % data_files[0]) + transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options) + src_ds = None + + os.rename(output_filename, data_files[0]) + return (data_files[0], metadata_file) + +class BrowseReportPreprocessor(BasePreprocessor): + def __init__(self, browse_report, replace=False): + super().__init__(replace) + self.browse_report = browse_report + + def get_input_container_filenames(self): + + def _get_browse_container_filename(filename): + parsed = urlparse(filename) + if parsed.scheme: + return (None, filename) + container = filename.split("/")[1] + filename = "/".join(filename.split("/")[2:]) + return container, filename + + return [ + _get_browse_container_filename(browse['filename']) + for browse in self.browse_report['browses'] + ] + + def download_source(self, swift, container, object_name, tmpdir): + local_path = os.path.join(tmpdir, os.path.basename(object_name)) + + if not container: + urlretrieve(object_name, local_path) + + # construct local path + swift_download_file(swift, container, object_name, local_path) + return local_path + + def unpack_source(self, filename, tmpdir): + # should already be a simple file + return filename + def preprocessor( collection, tar_object_path, replace=False, client=None, register_queue_key=None @@ -101,11 +341,11 @@ def preprocessor( "Aborting, package '%s' already exists at " "target container '%s'." % (package, container) ) - return(1) + return 1 except SwiftError as e: logger.debug(traceback.format_exc()) logger.error("%s: %s\n" % (type(e).__name__, str(e))) - return(1) + return 1 tmpfilename = os.path.join(tmpdirname, "tmp.tar") @@ -129,7 +369,7 @@ def preprocessor( logger.error( "'%s' download failed" % down_res["object"] ) - return(1) + return 1 tf = tarfile.open(tmpfilename, mode="r") @@ -151,7 +391,7 @@ def preprocessor( logger.error( "Aborting, not all needed files found in package." ) - return(1) + return 1 tf.extractall(path=tmpdirname, members=members) @@ -222,7 +462,7 @@ def preprocessor( logger.error( "'%s' upload failed" % upload["error"] ) - return(1) + return 1 if client is not None: logger.debug( @@ -235,7 +475,7 @@ def preprocessor( except Exception as e: logger.debug(traceback.format_exc()) logger.error("%s: %s\n" % (type(e).__name__, str(e))) - return(1) + return 1 logger.info( "Successfully finished preprocessing of '%s'." % (tar_object_path) @@ -245,6 +485,7 @@ def preprocessor( def preprocessor_redis_wrapper( collection, replace=False, host="localhost", port=6379, preprocess_queue_key="preprocess_queue", + preprocess_md_queue_key="preprocess-md_queue", register_queue_key="register_queue" ): client = redis.Redis( @@ -252,7 +493,7 @@ def preprocessor_redis_wrapper( ) while True: logger.debug("waiting for redis queue '%s'..." % preprocess_queue_key) - value = client.brpop(preprocess_queue_key) + queue, value = client.brpop([preprocess_queue_key, preprocess_md_queue_key]) preprocessor( collection, value[1], -- GitLab