EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit cf65942b authored by Stephan's avatar Stephan
Browse files

Adding initial preprocessor files.

parent 7ffe69c0
No related branches found
No related tags found
No related merge requests found
#------------------------------------------------------------------------------
#
# Project: prism data access server
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
#------------------------------------------------------------------------------
# Copyright (C) 2019 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------
FROM osgeo/gdal:alpine-normal-latest
MAINTAINER EOX
LABEL name="prism data access server preprocessor" \
vendor="EOX IT Services GmbH <https://eox.at>" \
license="MIT Copyright (C) 2019 EOX IT Services GmbH <https://eox.at>" \
type="prism data access server preprocessor" \
version="0.0.1-dev"
USER root
RUN apk add --no-cache gcc py3-pip python3-dev py3-setuptools musl-dev \
linux-headers py3-redis && \
pip3 install python-keystoneclient python-swiftclient argparse
ENV INSTANCE_ID="prism-data-access-server_preprocessor" \
COLLECTION= \
ST_AUTH_VERSION=3 \
OS_AUTH_URL="https://auth.cloud.ovh.net/v3/" \
OS_USERNAME= \
OS_PASSWORD= \
OS_TENANT_NAME= \
OS_TENANT_ID= \
OS_REGION_NAME= \
REDIS_HOST= \
REDIS_PORT= \
REDIS_PREPROCESS_QUEUE_KEY= \
REDIS_REGISTER_QUEUE_KEY=
ADD run-preprocessor.sh \
preprocessor.py \
get_min_max.py \
transform_chain.py \
/
RUN chmod -v +x \
/run-preprocessor.sh \
/preprocessor.py\
/get_min_max.py\
/transform_chain.py
CMD ["/run-preprocessor.sh"]
#!/usr/bin/env python
# -----------------------------------------------------------------------------
#
# Project: get_min_max.py
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
# -----------------------------------------------------------------------------
# Copyright (c) 2019 EOX IT Services GmbH
#
# Python script to retrieve min and max values of items.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# -----------------------------------------------------------------------------
import sys
import os
import argparse
import textwrap
import logging
import traceback
import subprocess
import re
from swiftclient.service import SwiftError, SwiftService
# collection: [name]
COLLECTION_MAP = {
"VHR_IMAGE_2018": ["VHR IMAGE 2018", ],
}
logger = logging.getLogger(__name__)
def setup_logging(verbosity):
# start logging setup
# get command line level
verbosity = verbosity
if verbosity == 0:
level = logging.CRITICAL
elif verbosity == 1:
level = logging.ERROR
elif verbosity == 2:
level = logging.WARNING
elif verbosity == 3:
level = logging.INFO
else:
level = logging.DEBUG
logger.setLevel(level)
sh = logging.StreamHandler()
sh.setLevel(level)
formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
sh.setFormatter(formatter)
logger.addHandler(sh)
# finished logging setup
def get_min_max(collection):
logger.info("Starting")
try:
with SwiftService() as swift, open("min_max_out", "a+") as outfile:
p = subprocess.run(
["swift", "auth"], capture_output=True,
timeout=600, check=True
)
vars = p.stdout.decode("utf-8")
os.environ["SWIFT_STORAGE_URL"] = re.findall(
r"OS_STORAGE_URL=(.*)\n", vars
)[0]
os.environ["SWIFT_AUTH_TOKEN"] = re.findall(
r"OS_AUTH_TOKEN=(.*)\n", vars
)[0]
outfile.write("container,product_type,min/max\n")
try:
list_gen = swift.list()
for page in list_gen:
if page["success"]:
for item in page["listing"]:
list_gen2 = swift.list(container=item["name"])
for page2 in list_gen2:
if page2["success"]:
for item2 in page2["listing"]:
if item2["name"].endswith(".TIF") or \
item2["name"].endswith(".tif"):
gdalout = subprocess.run([
"gdalinfo", "-mm",
"/vsiswift/%s/%s" %
(item["name"], item2["name"])],
capture_output=True,
timeout=600, check=True
).stdout.decode("utf-8")
minmax = re.findall(
r"Computed Min/Max=(.*)\n",
gdalout
)
outfile.write(
"%s,%s,%s\n" %
(item["name"],
item2["name"].split("/")[1],
minmax)
)
else:
logger.error(
"No product found in container '%s'."
% item["name"]
)
return(1)
else:
logger.error("No container found.")
return(1)
except SwiftError as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
return(1)
except Exception as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
return(1)
logger.info("Successfully finished")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.description = textwrap.dedent("""\
Get min/max of products.
""")
parser.add_argument(
"collection", default=None,
help=(
"Collection the registrar is run for."
)
)
parser.add_argument(
"-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4],
help=(
"Set verbosity of log output "
"(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)"
)
)
arg_values = parser.parse_args()
setup_logging(arg_values.verbosity)
collection = arg_values.collection
if collection not in COLLECTION_MAP:
logger.critical("Provided collection '%s' is not valid." % collection)
sys.exit(1)
get_min_max(
collection,
)
#!/usr/bin/env python
# -----------------------------------------------------------------------------
#
# Project: preprocessor.py
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
# -----------------------------------------------------------------------------
# Copyright (c) 2019 EOX IT Services GmbH
#
# Python script to preprocess product data.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# -----------------------------------------------------------------------------
import sys
import os
import argparse
import textwrap
import logging
import traceback
import redis
import tempfile
import tarfile
import re
import subprocess
from swiftclient.multithreading import OutputManager
from swiftclient.service import SwiftError, SwiftService, SwiftUploadObject
import transform_chain
# collection: [name]
COLLECTION_MAP = {
"VHR_IMAGE_2018": ["VHR IMAGE 2018", ],
"Emergency": ["Emergency", ],
}
logger = logging.getLogger(__name__)
def setup_logging(verbosity):
# start logging setup
# get command line level
verbosity = verbosity
if verbosity == 0:
level = logging.CRITICAL
elif verbosity == 1:
level = logging.ERROR
elif verbosity == 2:
level = logging.WARNING
elif verbosity == 3:
level = logging.INFO
else:
level = logging.DEBUG
logger.setLevel(level)
sh = logging.StreamHandler()
sh.setLevel(level)
formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
sh.setFormatter(formatter)
logger.addHandler(sh)
# finished logging setup
def preprocessor(
collection, tar_object_path, replace=False,
client=None, register_queue_key=None
):
logger.info("Starting preprocessing of '%s'." % (tar_object_path))
try:
container = tar_object_path.split("/")[1]
package = "/".join(tar_object_path.split("/")[2:])
with SwiftService() as swift, OutputManager(), \
tempfile.TemporaryDirectory() as tmpdirname:
if not replace:
try:
list_parts_gen = swift.list(
container=container, options={"prefix": package},
)
for page in list_parts_gen:
if page["success"]:
logger.critical(
"Aborting, package '%s' already exists at "
"target container '%s'." % (package, container)
)
return(1)
except SwiftError as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
return(1)
tmpfilename = os.path.join(tmpdirname, "tmp.tar")
options = {
"os_username": "", # TODO: Use from environment variable
"os_password": "", # TODO: Use from environment variable
"os_tenant_name": "", # TODO: Use from environment variable
"os_tenant_id": "", # TODO: Use from environment variable
}
with SwiftService(options=options) as swift_down:
for down_res in swift_down.download(
container=container,
objects=[package, ],
options={"out_file": tmpfilename},
):
if down_res["success"]:
logger.debug(
"'%s' downloaded" % down_res["object"]
)
else:
logger.error(
"'%s' download failed" % down_res["object"]
)
return(1)
tf = tarfile.open(tmpfilename, mode="r")
data_files_ti = [
m for m in tf.getmembers() if
re.search(r"IMG.+\.(TIF|JP2)", m.name)
]
metadata_file_ti = next(
m for m in tf.getmembers() if re.search(r"GSC.+\.xml", m.name)
)
data_files = [
member.name
for member in data_files_ti
]
metadata_file = metadata_file_ti.name
members = data_files_ti + [metadata_file_ti]
if not data_files or not metadata_file:
logger.error(
"Aborting, not all needed files found in package."
)
return(1)
tf.extractall(path=tmpdirname, members=members)
# cleanup after use to save space
tf.close()
os.remove(tmpfilename)
source_name = os.path.join(tmpdirname, data_files[0])
tmp_name = os.path.join(tmpdirname, "%s.tmp" % data_files[0])
dest_name = os.path.join(
package, "%s.tif" % os.path.splitext(data_files[0])[0]
)
# if there is more than one file, make a VRT to mosaic them
if len(data_files) > 1:
logger.debug("More files found, creating a VRT")
source_name = os.path.join(tmpdirname, 'tmp.vrt')
subprocess.run(
['gdalbuildvrt', source_name] + [
os.path.join(tmpdirname, data_file)
for data_file in data_files
],
timeout=600, check=True
)
# open file using gdal
logger.debug("Opening file using GDAL.")
dst = transform_chain.open_gdal_dataset(source_name)
# perform transformation steps as necessary
logger.debug("Changing geotransform.")
dst = transform_chain.correct_geo_transform(dst)
# save file with given options - should use ENV
creation_options = ["BLOCKSIZE=512", "COMPRESS=DEFLATE", "LEVEL=6", "NUM_THREADS=8", "BIGTIFF=IF_SAFER", "OVERVIEWS=AUTO", "RESAMPLING=CUBIC"]
logger.debug("Saving resulting file.")
transform_chain.write_gdal_dataset(dst, "COG", os.path.join(tmpdirname, "%s.tmp" % data_files[0]), creation_options)
dst = None
# check if 5GB swift upload limit is exceeded, if yes, use segmentation
size = os.stat(os.path.join(tmpdirname, "%s.tmp" % data_files[0])).st_size
uploadParams = {}
if (size > 1024 * 1024 * 1024 * 5):
uploadParams["segment_size"] = 1024 * 1024 * 1024 # 1gb segments
objects = [
SwiftUploadObject(tmp_name, object_name=dest_name),
SwiftUploadObject(
os.path.join(tmpdirname, metadata_file),
object_name=os.path.join(package, metadata_file)
)
]
for upload in swift.upload(
container=container,
objects=objects,
options=uploadParams,
):
if upload["success"]:
if "object" in upload:
logger.info(
"'%s' successfully uploaded." % upload["object"]
)
elif "for_object" in upload:
logger.debug(
"Successfully uploaded '%s' segment '%s'."
% (upload["for_object"], upload["segment_index"])
)
else:
logger.error(
"'%s' upload failed" % upload["error"]
)
return(1)
if client is not None:
logger.debug(
"Storing paths in redis queue '%s" % register_queue_key
)
client.lpush(
register_queue_key, "%s" % tar_object_path
)
except Exception as e:
logger.debug(traceback.format_exc())
logger.error("%s: %s\n" % (type(e).__name__, str(e)))
return(1)
logger.info(
"Successfully finished preprocessing of '%s'." % (tar_object_path)
)
def preprocessor_redis_wrapper(
collection, replace=False, host="localhost", port=6379,
preprocess_queue_key="preprocess_queue",
register_queue_key="register_queue"
):
client = redis.Redis(
host=host, port=port, charset="utf-8", decode_responses=True
)
while True:
logger.debug("waiting for redis queue '%s'..." % preprocess_queue_key)
value = client.brpop(preprocess_queue_key)
preprocessor(
collection,
value[1],
replace=replace,
client=client,
register_queue_key=register_queue_key
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.description = textwrap.dedent("""\
Preprocess product data.
""")
parser.add_argument(
"collection", default=None,
help=(
"Collection the preprocessor is run for."
)
)
parser.add_argument(
"--mode", default="standard", choices=["standard", "redis"],
help=(
"The mode to run the preprocessor. Either one-off (standard) or "
"reading from a redis queue."
)
)
parser.add_argument(
"--tar-object-path", default=None,
help=(
"Path to object holding tar archive file of product."
)
)
parser.add_argument(
"--replace", action="store_true",
help=(
"Replace existing products instead of skipping the preprocessing."
)
)
parser.add_argument(
"--redis-preprocess-queue-key", default="preprocess_queue"
)
parser.add_argument(
"--redis-register-queue-key", default="register_queue"
)
parser.add_argument(
"--redis-host", default="localhost"
)
parser.add_argument(
"--redis-port", type=int, default=6379
)
parser.add_argument(
"-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4],
help=(
"Set verbosity of log output "
"(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)"
)
)
arg_values = parser.parse_args()
setup_logging(arg_values.verbosity)
collection = arg_values.collection
if collection not in COLLECTION_MAP:
logger.critical("Provided collection '%s' is not valid." % collection)
sys.exit(1)
if arg_values.mode == "standard":
preprocessor(
collection,
arg_values.tar_object_path,
replace=arg_values.replace,
)
else:
preprocessor_redis_wrapper(
collection,
replace=arg_values.replace,
host=arg_values.redis_host,
port=arg_values.redis_port,
preprocess_queue_key=arg_values.redis_preprocess_queue_key,
register_queue_key=arg_values.redis_register_queue_key,
)
#!/bin/sh
echo "Running preprocessor"
python3 /preprocessor.py ${COLLECTION} --mode redis --redis-host ${REDIS_HOST} --redis-port ${REDIS_PORT} --redis-preprocess-queue-key ${REDIS_PREPROCESS_QUEUE_KEY} --redis-register-queue-key ${REDIS_REGISTER_QUEUE_KEY}
from osgeo import gdal
def open_gdal_dataset(input_file):
return gdal.Open(input_file)
def write_gdal_dataset(src_dst, driver_name, output_file_name, creation_options):
# writes dataset to an output file using a given driver and array of creation options (-CO)
driver_instance = gdal.GetDriverByName(driver_name)
dst_ds = driver_instance.CreateCopy(output_file_name, src_dst, strict=0, options=creation_options)
# write to disk
dst_ds = None
def correct_geo_transform(src_dst):
# input - gdal dataset
# sets new geotransform if necessary by creating control points of a raster with switched height and width - by Petr
# returns - gdal dataset
ulx, xres, xskew, uly, yskew, yres = src_dst.GetGeoTransform()
# test geotransform if necessary to shift
if xres == 0.0 and yres == 0.0:
# malformed image, compute xres and yres switched in geotransform
lrx = ulx + (src_dst.RasterXSize * xskew)
lry = uly + (src_dst.RasterYSize * yskew)
# [ulx, lrx, lry, uly] - bounds = lon_min, lon_max, lat_min, lat_max
fp = [[0, src_dst.RasterXSize, src_dst.RasterXSize, 0], [0, 0, src_dst.RasterYSize, src_dst.RasterYSize]]
tp = [[ulx, lrx, lrx, ulx], [lry, lry, uly, uly]]
pix = list(zip(fp[0], fp[1]))
coor = list(zip(tp[0], tp[1]))
# compute the gdal.GCP parameters
gcps = []
for index, txt in enumerate(pix):
gcps.append(gdal.GCP())
gcps[index].GCPPixel = pix[index][0]
gcps[index].GCPLine = src_dst.RasterYSize - int(pix[index][1])
gcps[index].GCPX = coor[index][0]
gcps[index].GCPY = coor[index][1]
# get correct geotransform from gcps
geotransform_new = gdal.GCPsToGeoTransform(gcps)
# overwrite geotransform with new
src_dst.SetGeoTransform(geotransform_new)
return src_dst
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment