EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit c0422814 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Splitting processor in Package and Browse

parent 49d6ef41
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@
import sys
import os
import os.path
import argparse
import textwrap
import logging
......@@ -40,6 +41,8 @@ import tempfile
import tarfile
import re
import subprocess
from urllib.parse import urlparse
from urllib.request import urlretrieve
from swiftclient.multithreading import OutputManager
from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject
......@@ -78,6 +81,243 @@ def setup_logging(verbosity):
# finished logging setup
def swift_download_file(swift, container, object_name, local_path):
""" Download a single file from a swift object storage
"""
options = {"out_file": local_path}
for download in swift.download(container, [object_name], options):
if download["success"]:
logger.debug(
"'%s' downloaded" % download["object"]
)
else:
raise Exception('Failed to download object %s' % object_name)
def swift_upload_files(local_dir, swift, container, filenames, base_path=None):
""" Upload multiple files
"""
# get a relative path to the local directory for each file
# we use this as a base path on the upload swift
relpaths = [
os.path.relpath(filename, local_dir)
for filename in filenames
]
# create upload objects with info from where to where is uploaded,
# potentially using segmented files when upload size is bigger than 5 GB
objects = [
SwiftUploadObject(
filename,
os.path.join(base_path, relpath) if base_path else relpath,
# check if 5GB swift upload limit is exceeded, if yes, use segmentation
{
'segment_size': 1024 * 1024 * 1024, # 1GB segments
} if os.stat(filename).st_size > 1024 * 1024 * 1024 * 5 else None
)
for filename, relpath in zip(filenames, relpaths)
]
# perform the upload
succeeded = []
failed = []
for upload in swift.upload(container, objects):
if upload["success"]:
succeeded.append(upload)
if "object" in upload:
logger.info(
"'%s' successfully uploaded." % upload["object"]
)
elif "for_object" in upload:
logger.debug(
"Successfully uploaded '%s' segment '%s'."
% (upload["for_object"], upload["segment_index"])
)
else:
failed.append(upload)
# cleanup if failed
if failed:
for upload in succeeded:
for _ in swift.delete(container, [upload['object']]):
pass
raise Exception('Failed to upload objects: %s' % (
', '.join(upload['object'] for upload in failed)
))
class BasePreprocessor:
def __init__(self, replace):
self.replace = replace
def preprocess(self):
options = {
"os_username": "", # TODO: Use from environment variable
"os_password": "", # TODO: Use from environment variable
"os_tenant_name": "", # TODO: Use from environment variable
"os_tenant_id": "", # TODO: Use from environment variable
}
with SwiftService(options=options) as input_swift, \
SwiftService() as output_swift, \
tempfile.TemporaryDirectory() as tmpdir, \
OutputManager():
for container, path in self.get_input_container_and_filenames():
if not self.replace and self.check_output_exists(output_swift, container, path):
logger.critical(
"Aborting, package '%s' already exists at "
"target container '%s'." % (path, container)
)
return
source_file = self.download_source(input_swift, container, path, tmpdir)
unpacked_files = self.unpack_source(source_file, tmpdir)
processed_files = self.process_image(unpacked_files, tmpdir)
self.upload_files(output_swift, container, path, processed_files, tmpdir)
def check_output_exists(self, swift, container, path):
list_parts_gen = swift.list(
container=container, options={"prefix": path},
)
for page in list_parts_gen:
if page["success"]:
return True
return False
def get_input_container_and_filenames(self):
raise NotImplementedError
def download_source(self, swift, container, object_name, tmpdir):
# construct local path
local_path = os.path.join(tmpdir, os.path.basename(object_name))
swift_download_file(swift, container, object_name, local_path)
return local_path
def unpack_source(self, downloaded, tmpdir):
raise NotImplementedError
def cleanup_source(self, filename):
os.unlink(filename)
def process_image(self, files, tmpdir):
raise NotImplementedError
def upload_files(self, swift, container, base_path, filenames, tmpdir):
swift_upload_files(tmpdir, swift, container, filenames, base_path)
class PackagePreprocessor(BasePreprocessor):
def __init__(self, tar_object_path, replace=False):
super().__init__(replace)
self.tar_object_path = tar_object_path
def get_input_container_and_filenames(self):
container = self.tar_object_path.split("/")[1]
package = "/".join(self.tar_object_path.split("/")[2:])
return [(container, package)]
def unpack_source(self, filename, tmpdir):
tf = tarfile.open(filename, mode="r")
data_files_members = [
m for m in tf.getmembers() if
re.search(r"IMG.+\.(TIF|JP2)", m.name)
]
metadata_file_member = next(
m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name)
)
data_files = [
member.name
for member in data_files_members
]
metadata_file = metadata_file_member.name
members = data_files_members + [metadata_file_member]
if not data_files or not metadata_file:
logger.error(
"Aborting, not all needed files found in package."
)
raise Exception()
tf.extractall(path=tmpdir, members=members)
# cleanup after use to save space
tf.close()
return data_files + metadata_file
def open_source_dataset(self, files, tmpdir):
data_files = files[:-1]
source_name = os.path.join(tmpdir, data_files[0])
# if there is more than one file, make a VRT to mosaic them
if len(data_files) > 1:
logger.debug("More files found, creating a VRT")
source_name = os.path.join(tmpdir, 'tmp.vrt')
subprocess.run(
['gdalbuildvrt', source_name] + [
os.path.join(tmpdir, data_file)
for data_file in data_files
],
timeout=600, check=True
)
# open file using gdal
logger.debug("Opening file using GDAL.")
return transform_chain.open_gdal_dataset(source_name)
def process_image(self, files, tmpdir, src_ds):
data_files, metadata_file = files[:-1], files[-1]
# perform transformation steps as necessary
logger.debug("Changing geotransform.")
src_ds = transform_chain.correct_geo_transform(src_ds)
# save file with given options - should use ENV
creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
logger.debug("Saving resulting file.")
output_filename = os.path.join(tmpdir, "%s.tmp" % data_files[0])
transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options)
src_ds = None
os.rename(output_filename, data_files[0])
return (data_files[0], metadata_file)
class BrowseReportPreprocessor(BasePreprocessor):
def __init__(self, browse_report, replace=False):
super().__init__(replace)
self.browse_report = browse_report
def get_input_container_filenames(self):
def _get_browse_container_filename(filename):
parsed = urlparse(filename)
if parsed.scheme:
return (None, filename)
container = filename.split("/")[1]
filename = "/".join(filename.split("/")[2:])
return container, filename
return [
_get_browse_container_filename(browse['filename'])
for browse in self.browse_report['browses']
]
def download_source(self, swift, container, object_name, tmpdir):
local_path = os.path.join(tmpdir, os.path.basename(object_name))
if not container:
urlretrieve(object_name, local_path)
# construct local path
swift_download_file(swift, container, object_name, local_path)
return local_path
def unpack_source(self, filename, tmpdir):
# should already be a simple file
return filename
def preprocessor(
collection, tar_object_path, replace=False,
client=None, register_queue_key=None
......@@ -101,11 +341,11 @@ def preprocessor(
"Aborting, package '%s' already exists at "
"target container '%s'." % (package, container)
)
return(1)
return 1
except SwiftError as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
return(1)
return 1
tmpfilename = os.path.join(tmpdirname, "tmp.tar")
......@@ -129,7 +369,7 @@ def preprocessor(
logger.error(
"'%s' download failed" % down_res["object"]
)
return(1)
return 1
tf = tarfile.open(tmpfilename, mode="r")
......@@ -151,7 +391,7 @@ def preprocessor(
logger.error(
"Aborting, not all needed files found in package."
)
return(1)
return 1
tf.extractall(path=tmpdirname, members=members)
......@@ -222,7 +462,7 @@ def preprocessor(
logger.error(
"'%s' upload failed" % upload["error"]
)
return(1)
return 1
if client is not None:
logger.debug(
......@@ -235,7 +475,7 @@ def preprocessor(
except Exception as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
return(1)
return 1
logger.info(
"Successfully finished preprocessing of '%s'." % (tar_object_path)
......@@ -245,6 +485,7 @@ def preprocessor(
def preprocessor_redis_wrapper(
collection, replace=False, host="localhost", port=6379,
preprocess_queue_key="preprocess_queue",
preprocess_md_queue_key="preprocess-md_queue",
register_queue_key="register_queue"
):
client = redis.Redis(
......@@ -252,7 +493,7 @@ def preprocessor_redis_wrapper(
)
while True:
logger.debug("waiting for redis queue '%s'..." % preprocess_queue_key)
value = client.brpop(preprocess_queue_key)
queue, value = client.brpop([preprocess_queue_key, preprocess_md_queue_key])
preprocessor(
collection,
value[1],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment