EOX GitLab Instance

Skip to content
Snippets Groups Projects
preprocessor.py 22.8 KiB
Newer Older
#!/usr/bin/env python
# -----------------------------------------------------------------------------
#
# Project: preprocessor.py
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
# -----------------------------------------------------------------------------
# Copyright (c) 2019 EOX IT Services GmbH
#
# Python script to preprocess product data.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# -----------------------------------------------------------------------------


import sys
import os
from os.path import join, basename, splitext
import argparse
import textwrap
import logging
import traceback
import redis
import tempfile
import tarfile
import re
import subprocess
from urllib.parse import urlparse
from urllib.request import urlretrieve

from swiftclient.multithreading import OutputManager
from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject

import transform_chain

# collection: [name]
COLLECTION_MAP = {
    "VHR_IMAGE_2018": ["VHR IMAGE 2018", ],
    "Emergency": ["Emergency", ],
}

logger = logging.getLogger(__name__)


def setup_logging(verbosity):
    # start logging setup
    # get command line level
    verbosity = verbosity
    if verbosity == 0:
        level = logging.CRITICAL
    elif verbosity == 1:
        level = logging.ERROR
    elif verbosity == 2:
        level = logging.WARNING
    elif verbosity == 3:
        level = logging.INFO
    else:
        level = logging.DEBUG
    logger.setLevel(level)
    sh = logging.StreamHandler()
    sh.setLevel(level)
    formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
    sh.setFormatter(formatter)
    logger.addHandler(sh)
    # finished logging setup


def swift_download_file(swift, container, object_name, local_path):
    """ Download a single file from a swift object storage
    """
    options = {"out_file": local_path}
    for download in swift.download(container, [object_name], options):
        if download["success"]:
            logger.debug(
                "'%s' downloaded" % download["object"]
            )
        else:
            raise Exception('Failed to download object %s' % object_name)


def swift_upload_files(local_dir, swift, container, filenames, base_path=None):
    """ Upload multiple files
    """

    # get a relative path to the local directory for each file
    # we use this as a base path on the upload swift
    relpaths = [
        os.path.relpath(filename, local_dir)
        for filename in filenames
    ]

    # create upload objects with info from where to where is uploaded,
    # potentially using segmented files when upload size is bigger than 5 GB
    objects = [
        SwiftUploadObject(
            filename,
            os.path.join(base_path, relpath) if base_path else relpath,
            # check if 5GB swift upload limit is exceeded, if yes, use segmentation
            {
                'segment_size': 1024 * 1024 * 1024, # 1GB segments
            } if os.stat(filename).st_size > 1024 * 1024 * 1024 * 5 else None
        )
        for filename, relpath in zip(filenames, relpaths)
    ]

    # perform the upload
    succeeded = []
    failed = []
    for upload in swift.upload(container, objects):
        if upload["success"]:
            succeeded.append(upload)
            if "object" in upload:
                logger.info(
                    "'%s' successfully uploaded." % upload["object"]
                )
            elif "for_object" in upload:
                logger.debug(
                    "Successfully uploaded '%s' segment '%s'."
                    % (upload["for_object"], upload["segment_index"])
                )
        else:
            failed.append(upload)

    # cleanup if failed
    if failed:
        for upload in succeeded:
            for _ in swift.delete(container, [upload['object']]):
                pass

        raise Exception('Failed to upload objects: %s' % (
            ', '.join(upload['object'] for upload in failed)
        ))


class BasePreprocessor:
    def __init__(self, replace):
        self.replace = replace

    def preprocess(self):
        options = {
            "os_username": "",  # TODO: Use from environment variable
            "os_password": "",  # TODO: Use from environment variable
            "os_tenant_name": "",  # TODO: Use from environment variable
            "os_tenant_id": "",  # TODO: Use from environment variable
        }
        with SwiftService(options=options) as input_swift, \
                SwiftService() as output_swift, \
                tempfile.TemporaryDirectory() as tmpdir, \
                OutputManager():

            for container, path, extra in self.get_input_container_and_filenames():
                if not self.replace and self.check_output_exists(output_swift, container, path):
                    logger.critical(
                        "Aborting, package '%s' already exists at "
                        "target container '%s'." % (path, container)
                    )
                    return

                source_file = self.download_source(input_swift, container, path, tmpdir)
                unpacked_files = self.unpack_source(source_file, tmpdir)
                processed_files = self.process_image(unpacked_files, extra, tmpdir)
                self.upload_files(output_swift, container, path, processed_files, tmpdir)

    def check_output_exists(self, swift, container, path):
        list_parts_gen = swift.list(
            container=container, options={"prefix": path},
        )
        for page in list_parts_gen:
            if page["success"]:
                return True
        return False

    def get_input_container_and_filenames(self):
        raise NotImplementedError

    def download_source(self, swift, container, object_name, tmpdir):
        # construct local path
        local_path = os.path.join(tmpdir, os.path.basename(object_name))
        swift_download_file(swift, container, object_name, local_path)
        return local_path

    def unpack_source(self, downloaded, tmpdir):
        raise NotImplementedError

    def cleanup_source(self, filename):
        os.unlink(filename)

    def process_image(self, files, extra, tmpdir):
        raise NotImplementedError

    def upload_files(self, swift, container, base_path, filenames, tmpdir):
        swift_upload_files(tmpdir, swift, container, filenames, base_path)


class PackagePreprocessor(BasePreprocessor):
    def __init__(self, tar_object_path, replace=False):
        super().__init__(replace)
        self.tar_object_path = tar_object_path

    def get_input_container_and_filenames(self):
        container = self.tar_object_path.split("/")[1]
        package = "/".join(self.tar_object_path.split("/")[2:])
        return [(container, package, None)]

    def unpack_source(self, filename, tmpdir):
        tf = tarfile.open(filename, mode="r")

        data_files_members = [
            m for m in tf.getmembers() if
            re.search(r"IMG.+\.(TIF|JP2)", m.name)
        ]
        metadata_file_member = next(
            m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name)
        )
        data_files = [
            member.name
            for member in data_files_members
        ]
        metadata_file = metadata_file_member.name
        members = data_files_members + [metadata_file_member]

        if not data_files or not metadata_file:
            logger.error(
                "Aborting, not all needed files found in package."
            )
            raise Exception()

        tf.extractall(path=tmpdir, members=members)

        # cleanup after use to save space
        tf.close()
        return data_files + metadata_file

    def open_source_dataset(self, files, tmpdir):
        data_files = files[:-1]
        source_name = os.path.join(tmpdir, data_files[0])
        # if there is more than one file, make a VRT to mosaic them
        if len(data_files) > 1:
            logger.debug("More files found, creating a VRT")
            source_name = os.path.join(tmpdir, 'tmp.vrt')
            subprocess.run(
                ['gdalbuildvrt', source_name] + [
                    os.path.join(tmpdir, data_file)
                    for data_file in data_files
                ],
                timeout=600, check=True
            )

        # open file using gdal
        logger.debug("Opening file using GDAL.")
        return transform_chain.open_gdal_dataset(source_name)

    def process_image(self, files, extra, tmpdir):
        data_files, metadata_file = files[:-1], files[-1]

        # get initial source dataset
        src_ds = self.open_source_dataset(data_files, tmpdir)

        # perform transformation steps as necessary
        logger.debug("Changing geotransform.")
        src_ds = transform_chain.correct_geo_transform(src_ds)

        # save file with given options - should use ENV
        creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
        logger.debug("Saving resulting file.")

        output_filename = os.path.join(tmpdir, "%s.tmp" % data_files[0])
        transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options)
        src_ds = None

        os.rename(output_filename, data_files[0])
        return (data_files[0], metadata_file)

class BrowseReportPreprocessor(BasePreprocessor):
    def __init__(self, browse_report, replace=False):
        super().__init__(replace)
        self.browse_report = browse_report

    def get_input_container_and_filenames(self):
        def _get_browse_container_filename(filename, browse):
            parsed = urlparse(filename)
            if parsed.scheme:
                return (None, filename)
            container = filename.split("/")[1]
            filename = "/".join(filename.split("/")[2:])
            return container, filename, browse
            _get_browse_container_filename(browse['filename'], browse)
            for browse in self.browse_report['browses']
        ]

    def download_source(self, swift, container, object_name, tmpdir):
        local_path = os.path.join(tmpdir, os.path.basename(object_name))

        # download source from either swift or HTTP
        if container:
            # construct local path
            swift_download_file(swift, container, object_name, local_path)
        else:
            urlretrieve(object_name, local_path)

        return local_path

    def unpack_source(self, filename, tmpdir):
        # should already be a simple file
        return [filename]

    def process_image(self, files, browse, tmpdir):
        data_file = files[0]

        src_ds = transform_chain.open_gdal_dataset(data_file)

        # TODO: preprocessing from ngeo

        # # perform transformation steps as necessary
        # logger.debug("Changing geotransform.")
        # src_ds = transform_chain.correct_geo_transform(src_ds)

        # save file with given options - should use ENV
        creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
        logger.debug("Saving resulting file.")

        output_filename = os.path.join(tmpdir, "%s.tmp" % data_file)
        transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options)
        src_ds = None

        os.rename(output_filename, data_file)

        # generate GSC metadata
        metadata_file = self.generate_metadata_file(data_file, browse, tmpdir)
        return (data_file, metadata_file)

    def generate_metadata_file(self, data_file, browse, tmpdir):
        now_time = datetime.now().isoformat().rpartition('.')[0] + 'Z'
        metadata = {
            'identifier': browse['browse_identifier'],
            'now_time': now_time,
            'archiving_time': now_time,
            'delivery_time': now_time,
            'begin_time': browse['start_time'],
            'end_time': browse['end_time'],
            'product_type': browse['browse_type'],
            'footprint': gsc_generator.get_footprint_from_browse(data_file, browse)
        }

        out_filename = join(tmpdir, splitext(basename(browse['filename']))[0] + '.xml')
        with open(out_filename, 'w') as f:
            f.write(
                gsc_generator.generate_gsc_metadata(metadata)
            )
        return out_filename

def preprocessor(
    collection, tar_object_path, replace=False,
    client=None, register_queue_key=None
):
    logger.info("Starting preprocessing of '%s'." % (tar_object_path))

    try:
        container = tar_object_path.split("/")[1]
        package = "/".join(tar_object_path.split("/")[2:])

        with SwiftService() as swift, OutputManager(), \
                tempfile.TemporaryDirectory() as tmpdirname:
            if not replace:
                try:
                    list_parts_gen = swift.list(
                        container=container, options={"prefix": package},
                    )
                    for page in list_parts_gen:
                        if page["success"]:
                            logger.critical(
                                "Aborting, package '%s' already exists at "
                                "target container '%s'." % (package, container)
                            )
                except SwiftError as e:
                    logger.debug(traceback.format_exc())
                    logger.error("%s: %s\n" % (type(e).__name__, str(e)))

            tmpfilename = os.path.join(tmpdirname, "tmp.tar")

            options = {
                "os_username": "",  # TODO: Use from environment variable
                "os_password": "",  # TODO: Use from environment variable
                "os_tenant_name": "",  # TODO: Use from environment variable
                "os_tenant_id": "",  # TODO: Use from environment variable
            }
            with SwiftService(options=options) as swift_down:
                for down_res in swift_down.download(
                    container=container,
                    objects=[package, ],
                    options={"out_file": tmpfilename},
                ):
                    if down_res["success"]:
                        logger.debug(
                            "'%s' downloaded" % down_res["object"]
                        )
                    else:
                        logger.error(
                            "'%s' download failed" % down_res["object"]
                        )

            tf = tarfile.open(tmpfilename, mode="r")

            data_files_ti = [
                m for m in tf.getmembers() if
                re.search(r"IMG.+\.(TIF|JP2)", m.name)
            ]
            metadata_file_ti = next(
                m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name)
            )
            data_files = [
                member.name
                for member in data_files_ti
            ]
            metadata_file = metadata_file_ti.name
            members = data_files_ti + [metadata_file_ti]

            if not data_files or not metadata_file:
                logger.error(
                    "Aborting, not all needed files found in package."
                )

            tf.extractall(path=tmpdirname, members=members)

            # cleanup after use to save space
            tf.close()
            os.remove(tmpfilename)

            source_name = os.path.join(tmpdirname, data_files[0])
            tmp_name = os.path.join(tmpdirname, "%s.tmp" % data_files[0])
            dest_name = os.path.join(
                package, "%s.tif" % os.path.splitext(data_files[0])[0]
            )
            # if there is more than one file, make a VRT to mosaic them
            if len(data_files) > 1:
                logger.debug("More files found, creating a VRT")
                source_name = os.path.join(tmpdirname, 'tmp.vrt')
                subprocess.run(
                    ['gdalbuildvrt', source_name] + [
                        os.path.join(tmpdirname, data_file)
                        for data_file in data_files
                    ],
                    timeout=600, check=True
                )

            # open file using gdal
            logger.debug("Opening file using GDAL.")
            dst = transform_chain.open_gdal_dataset(source_name)

            # perform transformation steps as necessary
            logger.debug("Changing geotransform.")
            dst = transform_chain.correct_geo_transform(dst)

            # save file with given options - should use ENV
            creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
            logger.debug("Saving resulting file.")
            transform_chain.write_gdal_dataset(dst, "COG", os.path.join(tmpdirname, "%s.tmp" % data_files[0]), creation_options)
            dst = None

            # check if 5GB swift upload limit is exceeded, if yes, use segmentation
            size = os.stat(os.path.join(tmpdirname, "%s.tmp" % data_files[0])).st_size
            uploadParams = {}
            if (size > 1024 * 1024 * 1024 * 5):
                uploadParams["segment_size"] = 1024 * 1024 * 1024  # 1gb segments

            objects = [
                SwiftUploadObject(tmp_name, object_name=dest_name),
                SwiftUploadObject(
                    os.path.join(tmpdirname, metadata_file),
                    object_name=os.path.join(package, metadata_file)
                )
            ]
            for upload in swift.upload(
                container=container,
                objects=objects,
                options=uploadParams,
            ):
                if upload["success"]:
                    if "object" in upload:
                        logger.info(
                            "'%s' successfully uploaded." % upload["object"]
                        )
                    elif "for_object" in upload:
                        logger.debug(
                            "Successfully uploaded '%s' segment '%s'."
                            % (upload["for_object"], upload["segment_index"])
                        )
                else:
                    logger.error(
                        "'%s' upload failed" % upload["error"]
                    )

            if client is not None:
                logger.debug(
                    "Storing paths in redis queue '%s" % register_queue_key
                )
                client.lpush(
                    register_queue_key, "%s" % tar_object_path
                )

    except Exception as e:
        logger.debug(traceback.format_exc())
        logger.error("%s: %s\n" % (type(e).__name__, str(e)))

    logger.info(
        "Successfully finished preprocessing of '%s'." % (tar_object_path)
    )


def preprocessor_redis_wrapper(
    collection, replace=False, host="localhost", port=6379,
    preprocess_queue_key="preprocess_queue",
    preprocess_md_queue_key="preprocess-md_queue",
    register_queue_key="register_queue"
):
    client = redis.Redis(
        host=host, port=port, charset="utf-8", decode_responses=True
    )
    while True:
        logger.debug("waiting for redis queue '%s'..." % preprocess_queue_key)
        queue, value = client.brpop([preprocess_queue_key, preprocess_md_queue_key])

        if queue == preprocess_md_queue_key:
            preprocessor = BrowseReportPreprocessor(json.loads(value))
        else:
            preprocessor = PackagePreprocessor(value)

        # preprocessor(
        #     collection,
        #     value[1],
        #     replace=replace,
        #     client=client,
        #     register_queue_key=register_queue_key
        # )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.description = textwrap.dedent("""\
    Preprocess product data.
    """)

    parser.add_argument(
        "collection", default=None,
        help=(
            "Collection the preprocessor is run for."
        )
    )
    parser.add_argument(
        "--mode", default="standard", choices=["standard", "redis"],
        help=(
            "The mode to run the preprocessor. Either one-off (standard) or "
            "reading from a redis queue."
        )
    )
    parser.add_argument(
        "--tar-object-path", default=None,
        help=(
            "Path to object holding tar archive file of product."
        )
    )
    parser.add_argument(
        "--replace", action="store_true",
        help=(
            "Replace existing products instead of skipping the preprocessing."
        )
    )
    parser.add_argument(
        "--redis-preprocess-queue-key", default="preprocess_queue"
    )
    parser.add_argument(
        "--redis-register-queue-key", default="register_queue"
    )
    parser.add_argument(
        "--redis-host", default="localhost"
    )
    parser.add_argument(
        "--redis-port", type=int, default=6379
    )

    parser.add_argument(
        "-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4],
        help=(
            "Set verbosity of log output "
            "(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)"
        )
    )

    arg_values = parser.parse_args()

    setup_logging(arg_values.verbosity)

    collection = arg_values.collection
    if collection not in COLLECTION_MAP:
        logger.critical("Provided collection '%s' is not valid." % collection)
        sys.exit(1)

    if arg_values.mode == "standard":
        preprocessor(
            collection,
            arg_values.tar_object_path,
            replace=arg_values.replace,
        )
    else:
        preprocessor_redis_wrapper(
            collection,
            replace=arg_values.replace,
            host=arg_values.redis_host,
            port=arg_values.redis_port,
            preprocess_queue_key=arg_values.redis_preprocess_queue_key,
            register_queue_key=arg_values.redis_register_queue_key,
        )