Newer
Older
#!/usr/bin/env python
# -----------------------------------------------------------------------------
#
# Project: preprocessor.py
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
# -----------------------------------------------------------------------------
# Copyright (c) 2019 EOX IT Services GmbH
#
# Python script to preprocess product data.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# -----------------------------------------------------------------------------
import sys
import os
from os.path import join, basename, splitext
import argparse
import textwrap
import logging
import traceback
import redis
import tempfile
import tarfile
import re
import subprocess
from urllib.parse import urlparse
from urllib.request import urlretrieve
from datetime import datetime
from swiftclient.multithreading import OutputManager
from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject
import transform_chain
import gsc_generator
# collection: [name]
COLLECTION_MAP = {
"VHR_IMAGE_2018": ["VHR IMAGE 2018", ],
"Emergency": ["Emergency", ],
}
logger = logging.getLogger(__name__)
def setup_logging(verbosity):
# start logging setup
# get command line level
verbosity = verbosity
if verbosity == 0:
level = logging.CRITICAL
elif verbosity == 1:
level = logging.ERROR
elif verbosity == 2:
level = logging.WARNING
elif verbosity == 3:
level = logging.INFO
else:
level = logging.DEBUG
logger.setLevel(level)
sh = logging.StreamHandler()
sh.setLevel(level)
formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
sh.setFormatter(formatter)
logger.addHandler(sh)
# finished logging setup
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def swift_download_file(swift, container, object_name, local_path):
""" Download a single file from a swift object storage
"""
options = {"out_file": local_path}
for download in swift.download(container, [object_name], options):
if download["success"]:
logger.debug(
"'%s' downloaded" % download["object"]
)
else:
raise Exception('Failed to download object %s' % object_name)
def swift_upload_files(local_dir, swift, container, filenames, base_path=None):
""" Upload multiple files
"""
# get a relative path to the local directory for each file
# we use this as a base path on the upload swift
relpaths = [
os.path.relpath(filename, local_dir)
for filename in filenames
]
# create upload objects with info from where to where is uploaded,
# potentially using segmented files when upload size is bigger than 5 GB
objects = [
SwiftUploadObject(
filename,
os.path.join(base_path, relpath) if base_path else relpath,
# check if 5GB swift upload limit is exceeded, if yes, use segmentation
{
'segment_size': 1024 * 1024 * 1024, # 1GB segments
} if os.stat(filename).st_size > 1024 * 1024 * 1024 * 5 else None
)
for filename, relpath in zip(filenames, relpaths)
]
# perform the upload
succeeded = []
failed = []
for upload in swift.upload(container, objects):
if upload["success"]:
succeeded.append(upload)
if "object" in upload:
logger.info(
"'%s' successfully uploaded." % upload["object"]
)
elif "for_object" in upload:
logger.debug(
"Successfully uploaded '%s' segment '%s'."
% (upload["for_object"], upload["segment_index"])
)
else:
failed.append(upload)
# cleanup if failed
if failed:
for upload in succeeded:
for _ in swift.delete(container, [upload['object']]):
pass
raise Exception('Failed to upload objects: %s' % (
', '.join(upload['object'] for upload in failed)
))
class BasePreprocessor:
def __init__(self, replace):
self.replace = replace
def preprocess(self):
options = {
"os_username": "", # TODO: Use from environment variable
"os_password": "", # TODO: Use from environment variable
"os_tenant_name": "", # TODO: Use from environment variable
"os_tenant_id": "", # TODO: Use from environment variable
}
with SwiftService(options=options) as input_swift, \
SwiftService() as output_swift, \
tempfile.TemporaryDirectory() as tmpdir, \
OutputManager():
for container, path, extra in self.get_input_container_and_filenames():
if not self.replace and self.check_output_exists(output_swift, container, path):
logger.critical(
"Aborting, package '%s' already exists at "
"target container '%s'." % (path, container)
)
return
source_file = self.download_source(input_swift, container, path, tmpdir)
unpacked_files = self.unpack_source(source_file, tmpdir)
processed_files = self.process_image(unpacked_files, extra, tmpdir)
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
self.upload_files(output_swift, container, path, processed_files, tmpdir)
def check_output_exists(self, swift, container, path):
list_parts_gen = swift.list(
container=container, options={"prefix": path},
)
for page in list_parts_gen:
if page["success"]:
return True
return False
def get_input_container_and_filenames(self):
raise NotImplementedError
def download_source(self, swift, container, object_name, tmpdir):
# construct local path
local_path = os.path.join(tmpdir, os.path.basename(object_name))
swift_download_file(swift, container, object_name, local_path)
return local_path
def unpack_source(self, downloaded, tmpdir):
raise NotImplementedError
def cleanup_source(self, filename):
os.unlink(filename)
def process_image(self, files, extra, tmpdir):
raise NotImplementedError
def upload_files(self, swift, container, base_path, filenames, tmpdir):
swift_upload_files(tmpdir, swift, container, filenames, base_path)
class PackagePreprocessor(BasePreprocessor):
def __init__(self, tar_object_path, replace=False):
super().__init__(replace)
self.tar_object_path = tar_object_path
def get_input_container_and_filenames(self):
container = self.tar_object_path.split("/")[1]
package = "/".join(self.tar_object_path.split("/")[2:])
return [(container, package, None)]
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def unpack_source(self, filename, tmpdir):
tf = tarfile.open(filename, mode="r")
data_files_members = [
m for m in tf.getmembers() if
re.search(r"IMG.+\.(TIF|JP2)", m.name)
]
metadata_file_member = next(
m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name)
)
data_files = [
member.name
for member in data_files_members
]
metadata_file = metadata_file_member.name
members = data_files_members + [metadata_file_member]
if not data_files or not metadata_file:
logger.error(
"Aborting, not all needed files found in package."
)
raise Exception()
tf.extractall(path=tmpdir, members=members)
# cleanup after use to save space
tf.close()
return data_files + metadata_file
def open_source_dataset(self, files, tmpdir):
data_files = files[:-1]
source_name = os.path.join(tmpdir, data_files[0])
# if there is more than one file, make a VRT to mosaic them
if len(data_files) > 1:
logger.debug("More files found, creating a VRT")
source_name = os.path.join(tmpdir, 'tmp.vrt')
subprocess.run(
['gdalbuildvrt', source_name] + [
os.path.join(tmpdir, data_file)
for data_file in data_files
],
timeout=600, check=True
)
# open file using gdal
logger.debug("Opening file using GDAL.")
return transform_chain.open_gdal_dataset(source_name)
def process_image(self, files, extra, tmpdir):
data_files, metadata_file = files[:-1], files[-1]
# get initial source dataset
src_ds = self.open_source_dataset(data_files, tmpdir)
# perform transformation steps as necessary
logger.debug("Changing geotransform.")
src_ds = transform_chain.correct_geo_transform(src_ds)
# save file with given options - should use ENV
creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
logger.debug("Saving resulting file.")
output_filename = os.path.join(tmpdir, "%s.tmp" % data_files[0])
transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options)
src_ds = None
os.rename(output_filename, data_files[0])
return (data_files[0], metadata_file)
class BrowseReportPreprocessor(BasePreprocessor):
def __init__(self, browse_report, replace=False):
super().__init__(replace)
self.browse_report = browse_report
def get_input_container_and_filenames(self):
def _get_browse_container_filename(filename, browse):
parsed = urlparse(filename)
if parsed.scheme:
return (None, filename)
container = filename.split("/")[1]
filename = "/".join(filename.split("/")[2:])
return container, filename, browse
_get_browse_container_filename(browse['filename'], browse)
for browse in self.browse_report['browses']
]
def download_source(self, swift, container, object_name, tmpdir):
local_path = os.path.join(tmpdir, os.path.basename(object_name))
# download source from either swift or HTTP
if container:
# construct local path
swift_download_file(swift, container, object_name, local_path)
else:
urlretrieve(object_name, local_path)
return local_path
def unpack_source(self, filename, tmpdir):
# should already be a simple file
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
return [filename]
def process_image(self, files, browse, tmpdir):
data_file = files[0]
src_ds = transform_chain.open_gdal_dataset(data_file)
# TODO: preprocessing from ngeo
# # perform transformation steps as necessary
# logger.debug("Changing geotransform.")
# src_ds = transform_chain.correct_geo_transform(src_ds)
# save file with given options - should use ENV
creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
logger.debug("Saving resulting file.")
output_filename = os.path.join(tmpdir, "%s.tmp" % data_file)
transform_chain.write_gdal_dataset(src_ds, "COG", output_filename, creation_options)
src_ds = None
os.rename(output_filename, data_file)
# generate GSC metadata
metadata_file = self.generate_metadata_file(data_file, browse, tmpdir)
return (data_file, metadata_file)
def generate_metadata_file(self, data_file, browse, tmpdir):
now_time = datetime.now().isoformat().rpartition('.')[0] + 'Z'
metadata = {
'identifier': browse['browse_identifier'],
'now_time': now_time,
'archiving_time': now_time,
'delivery_time': now_time,
'begin_time': browse['start_time'],
'end_time': browse['end_time'],
'product_type': browse['browse_type'],
'footprint': gsc_generator.get_footprint_from_browse(data_file, browse)
}
out_filename = join(tmpdir, splitext(basename(browse['filename']))[0] + '.xml')
with open(out_filename, 'w') as f:
f.write(
gsc_generator.generate_gsc_metadata(metadata)
)
return out_filename
def preprocessor(
collection, tar_object_path, replace=False,
client=None, register_queue_key=None
):
logger.info("Starting preprocessing of '%s'." % (tar_object_path))
try:
container = tar_object_path.split("/")[1]
package = "/".join(tar_object_path.split("/")[2:])
with SwiftService() as swift, OutputManager(), \
tempfile.TemporaryDirectory() as tmpdirname:
if not replace:
try:
list_parts_gen = swift.list(
container=container, options={"prefix": package},
)
for page in list_parts_gen:
if page["success"]:
logger.critical(
"Aborting, package '%s' already exists at "
"target container '%s'." % (package, container)
)
except SwiftError as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
tmpfilename = os.path.join(tmpdirname, "tmp.tar")
options = {
"os_username": "", # TODO: Use from environment variable
"os_password": "", # TODO: Use from environment variable
"os_tenant_name": "", # TODO: Use from environment variable
"os_tenant_id": "", # TODO: Use from environment variable
}
with SwiftService(options=options) as swift_down:
for down_res in swift_down.download(
container=container,
objects=[package, ],
options={"out_file": tmpfilename},
):
if down_res["success"]:
logger.debug(
"'%s' downloaded" % down_res["object"]
)
else:
logger.error(
"'%s' download failed" % down_res["object"]
)
tf = tarfile.open(tmpfilename, mode="r")
data_files_ti = [
m for m in tf.getmembers() if
re.search(r"IMG.+\.(TIF|JP2)", m.name)
]
metadata_file_ti = next(
m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name)
)
data_files = [
member.name
for member in data_files_ti
]
metadata_file = metadata_file_ti.name
members = data_files_ti + [metadata_file_ti]
if not data_files or not metadata_file:
logger.error(
"Aborting, not all needed files found in package."
)
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
tf.extractall(path=tmpdirname, members=members)
# cleanup after use to save space
tf.close()
os.remove(tmpfilename)
source_name = os.path.join(tmpdirname, data_files[0])
tmp_name = os.path.join(tmpdirname, "%s.tmp" % data_files[0])
dest_name = os.path.join(
package, "%s.tif" % os.path.splitext(data_files[0])[0]
)
# if there is more than one file, make a VRT to mosaic them
if len(data_files) > 1:
logger.debug("More files found, creating a VRT")
source_name = os.path.join(tmpdirname, 'tmp.vrt')
subprocess.run(
['gdalbuildvrt', source_name] + [
os.path.join(tmpdirname, data_file)
for data_file in data_files
],
timeout=600, check=True
)
# open file using gdal
logger.debug("Opening file using GDAL.")
dst = transform_chain.open_gdal_dataset(source_name)
# perform transformation steps as necessary
logger.debug("Changing geotransform.")
dst = transform_chain.correct_geo_transform(dst)
# save file with given options - should use ENV
creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
logger.debug("Saving resulting file.")
transform_chain.write_gdal_dataset(dst, "COG", os.path.join(tmpdirname, "%s.tmp" % data_files[0]), creation_options)
dst = None
# check if 5GB swift upload limit is exceeded, if yes, use segmentation
size = os.stat(os.path.join(tmpdirname, "%s.tmp" % data_files[0])).st_size
uploadParams = {}
if (size > 1024 * 1024 * 1024 * 5):
uploadParams["segment_size"] = 1024 * 1024 * 1024 # 1gb segments
objects = [
SwiftUploadObject(tmp_name, object_name=dest_name),
SwiftUploadObject(
os.path.join(tmpdirname, metadata_file),
object_name=os.path.join(package, metadata_file)
)
]
for upload in swift.upload(
container=container,
objects=objects,
options=uploadParams,
):
if upload["success"]:
if "object" in upload:
logger.info(
"'%s' successfully uploaded." % upload["object"]
)
elif "for_object" in upload:
logger.debug(
"Successfully uploaded '%s' segment '%s'."
% (upload["for_object"], upload["segment_index"])
)
else:
logger.error(
"'%s' upload failed" % upload["error"]
)
if client is not None:
logger.debug(
"Storing paths in redis queue '%s" % register_queue_key
)
client.lpush(
register_queue_key, "%s" % tar_object_path
)
except Exception as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
logger.info(
"Successfully finished preprocessing of '%s'." % (tar_object_path)
)
def preprocessor_redis_wrapper(
collection, replace=False, host="localhost", port=6379,
preprocess_queue_key="preprocess_queue",
preprocess_md_queue_key="preprocess-md_queue",
register_queue_key="register_queue"
):
client = redis.Redis(
host=host, port=port, charset="utf-8", decode_responses=True
)
while True:
logger.debug("waiting for redis queue '%s'..." % preprocess_queue_key)
queue, value = client.brpop([preprocess_queue_key, preprocess_md_queue_key])
if queue == preprocess_md_queue_key:
preprocessor = BrowseReportPreprocessor(json.loads(value))
else:
preprocessor = PackagePreprocessor(value)
# preprocessor(
# collection,
# value[1],
# replace=replace,
# client=client,
# register_queue_key=register_queue_key
# )
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.description = textwrap.dedent("""\
Preprocess product data.
""")
parser.add_argument(
"collection", default=None,
help=(
"Collection the preprocessor is run for."
)
)
parser.add_argument(
"--mode", default="standard", choices=["standard", "redis"],
help=(
"The mode to run the preprocessor. Either one-off (standard) or "
"reading from a redis queue."
)
)
parser.add_argument(
"--tar-object-path", default=None,
help=(
"Path to object holding tar archive file of product."
)
)
parser.add_argument(
"--replace", action="store_true",
help=(
"Replace existing products instead of skipping the preprocessing."
)
)
parser.add_argument(
"--redis-preprocess-queue-key", default="preprocess_queue"
)
parser.add_argument(
"--redis-register-queue-key", default="register_queue"
)
parser.add_argument(
"--redis-host", default="localhost"
)
parser.add_argument(
"--redis-port", type=int, default=6379
)
parser.add_argument(
"-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4],
help=(
"Set verbosity of log output "
"(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)"
)
)
arg_values = parser.parse_args()
setup_logging(arg_values.verbosity)
collection = arg_values.collection
if collection not in COLLECTION_MAP:
logger.critical("Provided collection '%s' is not valid." % collection)
sys.exit(1)
if arg_values.mode == "standard":
preprocessor(
collection,
arg_values.tar_object_path,
replace=arg_values.replace,
)
else:
preprocessor_redis_wrapper(
collection,
replace=arg_values.replace,
host=arg_values.redis_host,
port=arg_values.redis_port,
preprocess_queue_key=arg_values.redis_preprocess_queue_key,
register_queue_key=arg_values.redis_register_queue_key,
)