EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit 2c10a9a3 authored by Lubomir Dolezal's avatar Lubomir Dolezal
Browse files

use bucket name from env for preprocessor and registrar

parent 2ee53fb7
No related branches found
No related tags found
1 merge request!3use bucket name from env for preprocessor and registrar
...@@ -50,6 +50,7 @@ ENV INSTANCE_ID="prism-view-server_core" \ ...@@ -50,6 +50,7 @@ ENV INSTANCE_ID="prism-view-server_core" \
INSTANCE_NAME="pvs_instance"\ INSTANCE_NAME="pvs_instance"\
INSTANCE_DIR="/var/www/pvs/dev/pvs_instance/" \ INSTANCE_DIR="/var/www/pvs/dev/pvs_instance/" \
COLLECTION= \ COLLECTION= \
UPLOAD_CONTAINER= \
DB_USER= \ DB_USER= \
DB_PW= \ DB_PW= \
DB_HOST= \ DB_HOST= \
......
...@@ -139,17 +139,15 @@ class RegistrationError(Exception): ...@@ -139,17 +139,15 @@ class RegistrationError(Exception):
@transaction.atomic @transaction.atomic
def registrar( def registrar(
collection, collection,
objects_prefix, replace=False, client=None, registered_set_key=None objects_prefix, upload_container, replace=False, client=None, registered_set_key=None
): ):
logger.info("Starting registration of product '%s'." % objects_prefix) logger.info("Starting registration of product '%s'." % objects_prefix)
container = objects_prefix.split("/")[1]
package = "/".join(objects_prefix.split("/")[2:])
metadata_package, data_package, has_vrt = None, None, None metadata_package, data_package, has_vrt = None, None, None
with SwiftService() as swift: with SwiftService() as swift:
list_parts_gen = swift.list( list_parts_gen = swift.list(
container=container, options={"prefix": package}, container=upload_container, options={"prefix": objects_prefix},
) )
for page in list_parts_gen: for page in list_parts_gen:
if page["success"]: if page["success"]:
...@@ -184,7 +182,7 @@ def registrar( ...@@ -184,7 +182,7 @@ def registrar(
product_type = data_package.split("/")[1] product_type = data_package.split("/")[1]
product, replaced = ProductRegistrator().register( product, replaced = ProductRegistrator().register(
metadata_locations=[[container, metadata_locations=[[upload_container,
metadata_package, ], ], metadata_package, ], ],
type_name="%s_Product_%s" % (collection, product_type), type_name="%s_Product_%s" % (collection, product_type),
replace=replace, replace=replace,
...@@ -211,8 +209,8 @@ def registrar( ...@@ -211,8 +209,8 @@ def registrar(
models.collection_insert_eo_object(collection_level_3, product) models.collection_insert_eo_object(collection_level_3, product)
report = GDALRegistrator().register( report = GDALRegistrator().register(
data_locations=[[container, data_package, ], ], data_locations=[[upload_container, data_package, ], ],
metadata_locations=[[container, metadata_locations=[[upload_container,
metadata_package, ], ], metadata_package, ], ],
coverage_type_name="RGBNir", coverage_type_name="RGBNir",
overrides={"identifier": "%s__coverage" % product.identifier}, overrides={"identifier": "%s__coverage" % product.identifier},
...@@ -246,6 +244,7 @@ def registrar( ...@@ -246,6 +244,7 @@ def registrar(
def registrar_redis_wrapper( def registrar_redis_wrapper(
collection, collection,
upload_container,
replace=False, host="localhost", port=6379, replace=False, host="localhost", port=6379,
register_queue_key="register_queue", register_queue_key="register_queue",
registered_set_key="registered_set", registered_set_key="registered_set",
...@@ -260,6 +259,7 @@ def registrar_redis_wrapper( ...@@ -260,6 +259,7 @@ def registrar_redis_wrapper(
registrar( registrar(
collection, collection,
value[1], value[1],
upload_container,
replace=replace, replace=replace,
client=client, client=client,
registered_set_key=registered_set_key registered_set_key=registered_set_key
...@@ -332,15 +332,22 @@ if __name__ == "__main__": ...@@ -332,15 +332,22 @@ if __name__ == "__main__":
logger.critical("Provided collection '%s' is not valid." % collection) logger.critical("Provided collection '%s' is not valid." % collection)
sys.exit(1) sys.exit(1)
upload_container = os.environ.get('UPLOAD_CONTAINER')
if upload_container is None:
logger.critical("UPLOAD_CONTAINER environment variable not set.")
sys.exit(1)
if arg_values.mode == "standard": if arg_values.mode == "standard":
registrar( registrar(
collection, collection,
arg_values.objects_prefix, arg_values.objects_prefix,
upload_container,
replace=arg_values.replace, replace=arg_values.replace,
) )
else: else:
registrar_redis_wrapper( registrar_redis_wrapper(
collection, collection,
upload_container,
replace=arg_values.replace, replace=arg_values.replace,
host=arg_values.redis_host, host=arg_values.redis_host,
port=arg_values.redis_port, port=arg_values.redis_port,
......
COLLECTION=Emergency COLLECTION=Emergency
UPLOAD_CONTAINER=emg-data
GDAL_DISABLE_READDIR_ON_OPEN=TRUE GDAL_DISABLE_READDIR_ON_OPEN=TRUE
CPL_VSIL_CURL_ALLOWED_EXTENSIONS=.TIF,.tif,.xml CPL_VSIL_CURL_ALLOWED_EXTENSIONS=.TIF,.tif,.xml
COLLECTION=VHR_IMAGE_2018 COLLECTION=VHR_IMAGE_2018
UPLOAD_CONTAINER=vhr18-data
GDAL_DISABLE_READDIR_ON_OPEN=TRUE GDAL_DISABLE_READDIR_ON_OPEN=TRUE
CPL_VSIL_CURL_ALLOWED_EXTENSIONS=.TIF,.tif,.xml CPL_VSIL_CURL_ALLOWED_EXTENSIONS=.TIF,.tif,.xml
...@@ -41,6 +41,7 @@ RUN apk add --no-cache gcc py3-pip python3-dev py3-setuptools musl-dev \ ...@@ -41,6 +41,7 @@ RUN apk add --no-cache gcc py3-pip python3-dev py3-setuptools musl-dev \
ENV INSTANCE_ID="prism-data-access-server_preprocessor" \ ENV INSTANCE_ID="prism-data-access-server_preprocessor" \
COLLECTION= \ COLLECTION= \
UPLOAD_CONTAINER= \
ST_AUTH_VERSION=3 \ ST_AUTH_VERSION=3 \
OS_AUTH_URL="https://auth.cloud.ovh.net/v3/" \ OS_AUTH_URL="https://auth.cloud.ovh.net/v3/" \
OS_USERNAME= \ OS_USERNAME= \
......
...@@ -86,7 +86,7 @@ def setup_logging(verbosity): ...@@ -86,7 +86,7 @@ def setup_logging(verbosity):
def preprocessor( def preprocessor(
collection, tar_object_path, replace=False, collection, tar_object_path, upload_container, replace=False,
client=None, register_queue_key=None client=None, register_queue_key=None
): ):
logger.info("Starting preprocessing of '%s'." % (tar_object_path)) logger.info("Starting preprocessing of '%s'." % (tar_object_path))
...@@ -100,7 +100,7 @@ def preprocessor( ...@@ -100,7 +100,7 @@ def preprocessor(
if not replace: if not replace:
try: try:
list_parts_gen = swift.list( list_parts_gen = swift.list(
container=container, options={"prefix": package}, container=upload_container, options={"prefix": tar_object_path},
) )
for page in list_parts_gen: for page in list_parts_gen:
if page["success"]: if page["success"]:
...@@ -245,7 +245,7 @@ def preprocessor( ...@@ -245,7 +245,7 @@ def preprocessor(
SwiftUploadObject( SwiftUploadObject(
vrt_name, vrt_name,
object_name=os.path.join( object_name=os.path.join(
package, os.path.basename(vrt_name)) container, package, os.path.basename(vrt_name))
) )
) )
...@@ -257,23 +257,26 @@ def preprocessor( ...@@ -257,23 +257,26 @@ def preprocessor(
swift_upload_options["segment_size"] = 2 * 1024 * 1024 * 1024 # 2gb segments swift_upload_options["segment_size"] = 2 * 1024 * 1024 * 1024 # 2gb segments
dest_object_name = os.path.join( dest_object_name = os.path.join(
package, os.path.basename(data_file) container, package, os.path.basename(data_file)
) )
objects.append( objects.append(
SwiftUploadObject(data_file, object_name=dest_object_name) SwiftUploadObject(
data_file,
object_name=dest_object_name
)
) )
# add metadata to files to be uploaded after data files # add metadata to files to be uploaded after data files
objects.append( objects.append(
SwiftUploadObject( SwiftUploadObject(
os.path.join(tmpdirname, metadata_file), os.path.join(tmpdirname, metadata_file),
object_name=os.path.join(package, metadata_file) object_name=os.path.join(container, package, metadata_file)
) )
) )
# upload files # upload files
for upload in swift.upload( for upload in swift.upload(
container=container, container=upload_container,
objects=objects, objects=objects,
options=swift_upload_options options=swift_upload_options
): ):
...@@ -312,7 +315,7 @@ def preprocessor( ...@@ -312,7 +315,7 @@ def preprocessor(
def preprocessor_redis_wrapper( def preprocessor_redis_wrapper(
collection, replace=False, host="localhost", port=6379, collection, upload_container, replace=False, host="localhost", port=6379,
preprocess_queue_key="preprocess_queue", preprocess_queue_key="preprocess_queue",
register_queue_key="register_queue" register_queue_key="register_queue"
): ):
...@@ -325,6 +328,7 @@ def preprocessor_redis_wrapper( ...@@ -325,6 +328,7 @@ def preprocessor_redis_wrapper(
preprocessor( preprocessor(
collection, collection,
value[1], value[1],
upload_container,
replace=replace, replace=replace,
client=client, client=client,
register_queue_key=register_queue_key register_queue_key=register_queue_key
...@@ -392,15 +396,22 @@ if __name__ == "__main__": ...@@ -392,15 +396,22 @@ if __name__ == "__main__":
logger.critical("Provided collection '%s' is not valid." % collection) logger.critical("Provided collection '%s' is not valid." % collection)
sys.exit(1) sys.exit(1)
upload_container = os.environ.get('UPLOAD_CONTAINER')
if upload_container is None:
logger.critical("UPLOAD_CONTAINER environment variable not set.")
sys.exit(1)
if arg_values.mode == "standard": if arg_values.mode == "standard":
preprocessor( preprocessor(
collection, collection,
arg_values.tar_object_path, arg_values.tar_object_path,
upload_container,
replace=arg_values.replace, replace=arg_values.replace,
) )
else: else:
preprocessor_redis_wrapper( preprocessor_redis_wrapper(
collection, collection,
upload_container,
replace=arg_values.replace, replace=arg_values.replace,
host=arg_values.redis_host, host=arg_values.redis_host,
port=arg_values.redis_port, port=arg_values.redis_port,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment