diff --git a/config/dem_preprocessor-config.yml b/config/dem_preprocessor-config.yml index 8f34983aef67713bc9fe66b23aca425c88aea775..3ae30fe3a7b0cf59ba735aabe6e05db529ff4fb1 100644 --- a/config/dem_preprocessor-config.yml +++ b/config/dem_preprocessor-config.yml @@ -11,6 +11,7 @@ source: user_domain_name: !env '${OS_USER_DOMAIN_NAME_DOWNLOAD}' target: type: swift + replace: false kwargs: username: !env '${OS_USERNAME}' password: !env '${OS_PASSWORD}' diff --git a/config/emg_preprocessor-config.yml b/config/emg_preprocessor-config.yml index a21950f75d71ac115a8e259c903e4f4f84de0a09..dc6fc7b08763e5069c0839ea9409f36c6336c271 100644 --- a/config/emg_preprocessor-config.yml +++ b/config/emg_preprocessor-config.yml @@ -22,6 +22,7 @@ source: # user_domain_name: !env{{OS_USER_DOMAIN_NAME}} target: type: local + replace: true kwargs: storage_path: /mnt/data/target diff --git a/config/vhr18_preprocessor-config.yml b/config/vhr18_preprocessor-config.yml index a63ffb1d98003d3d4a9abec234ece0e6b3ef33a6..359c52da54a21f67d2b38ab45d88671dbe404d23 100644 --- a/config/vhr18_preprocessor-config.yml +++ b/config/vhr18_preprocessor-config.yml @@ -11,6 +11,7 @@ source: user_domain_name: !env '${OS_USER_DOMAIN_NAME_DOWNLOAD}' target: type: swift + replace: false kwargs: username: !env '${OS_USERNAME}' password: !env '${OS_PASSWORD}' diff --git a/documentation/operator-guide/configuration.rst b/documentation/operator-guide/configuration.rst index fedf0a9569123792707496ee8ebb8b820a293744..9ecfbd6d147349e4533435ea6cdab0093d9a760d 100644 --- a/documentation/operator-guide/configuration.rst +++ b/documentation/operator-guide/configuration.rst @@ -117,8 +117,6 @@ The following ``.env`` files are typically used: django admin user to be used with the admin GUI. * ``<stack-name>_obs.env``: This contains access parameters for the object storage(s). -* ``<stack-name>_preprocessor.env``: Preprocessor related environment - variables * ``<stack-name>_redis.env``: Redis access credentials and queue names @@ -173,6 +171,7 @@ retrieve the original product files: * ``OS_REGION_NAME_DOWNLOAD`` * ``OS_AUTH_URL_DOWNLOAD`` * ``ST_AUTH_VERSION_DOWNLOAD`` +* ``OS_USER_DOMAIN_NAME_DOWNLOAD`` VS Environment Variables ^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/preprocessor/preprocessor/config-schema.yaml b/preprocessor/preprocessor/config-schema.yaml index dd67fed643576da60143c38d16c930555d01444f..93f3f9b4f972556cf362bdc5eb24a163772d6fe9 100644 --- a/preprocessor/preprocessor/config-schema.yaml +++ b/preprocessor/preprocessor/config-schema.yaml @@ -26,6 +26,10 @@ properties: description: Extra arguments. Use depends on actual implementation. type: object # required: [type] + replace: + description: If set to true, output replaces already existing files on target. If no existing are present, preprocessing does not start. + type: boolean + default: false workdir: description: The local directory, where intermediary files are to be stored. type: string diff --git a/preprocessor/preprocessor/preprocess.py b/preprocessor/preprocessor/preprocess.py index 355505e7bcb87946e9570fb2faacac9026ccefd1..626099a5ab25ec001f907644fb1e856276bde68a 100644 --- a/preprocessor/preprocessor/preprocess.py +++ b/preprocessor/preprocessor/preprocess.py @@ -124,6 +124,16 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N """ with workdir(config, use_dir) as dirname, Timer() as preprocess_timer: logger.info('Preprocessing %s in %s' % (file_path, dirname)) + target_config = config['target'] + # check if target.replace is configured and if not, check storage if files there + if not target_config['replace']: + uploader = get_uploader( + target_config['type'], target_config.get('args'), target_config.get('kwargs') + ) + if uploader.product_exists(file_path): + raise Exception('Target.replace configuration is not set to true and objects already exist in target %s.' % file_path) + else: + logger.debug('Product does not yet exist on target') # check if we can reuse a previous download if not os.path.isdir('download'): os.mkdir('download') @@ -198,7 +208,6 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N preprocess_internal(preprocess_config, 'unpack') # get an uploader for the finalized images - target_config = config['target'] uploader = get_uploader( target_config['type'], target_config.get('args'), target_config.get('kwargs') ) diff --git a/preprocessor/preprocessor/transfer/abc.py b/preprocessor/preprocessor/transfer/abc.py index 409a2ea39cb145f14fc4b036c03d6d9859ff2496..9cc9818dcd1393b73f2c60a1fe78ec0cce98aad5 100644 --- a/preprocessor/preprocessor/transfer/abc.py +++ b/preprocessor/preprocessor/transfer/abc.py @@ -30,3 +30,7 @@ class Uploader(ABC): @abstractmethod def upload(self, local_path: Union[PathLike, List[PathLike]], remote_dir: PathLike) -> List[PathLike]: pass + + @abstractmethod + def product_exists(self, remote_dir: PathLike) -> bool: + pass diff --git a/preprocessor/preprocessor/transfer/local.py b/preprocessor/preprocessor/transfer/local.py index 2ee3da83f9a9f588bb03281d7b5844d2596f8172..f1450f7d6b18d2e0a98232023d5bda5cb6db667a 100644 --- a/preprocessor/preprocessor/transfer/local.py +++ b/preprocessor/preprocessor/transfer/local.py @@ -39,3 +39,10 @@ class Uploader(Base): shutil.copy2(local_path, remote_path) return remote_paths + + def product_exists(self, remote_dir: os.PathLike) -> bool: + remote_path = os.path.join(self.storage_path, remote_dir) + for r, d, f in os.walk(remote_path): + if len(f) >= 2: + return True + return False diff --git a/preprocessor/preprocessor/transfer/swift.py b/preprocessor/preprocessor/transfer/swift.py index 7ecd853c33fadf17047d2fd1540bce1c9d51225b..ad57dabcfbfb73b99ab9af169644d05fadea8b86 100644 --- a/preprocessor/preprocessor/transfer/swift.py +++ b/preprocessor/preprocessor/transfer/swift.py @@ -98,8 +98,8 @@ class Uploader(Base): options['use_slo'] = True with self.get_service() as swift: - # use container first part of path of container as upload container - container = self.container or paths[0].partition('/')[0] + # use container or first part of path + container = self.container or remote_dir.partition('/')[0] results = swift.upload(container=container, objects=objects, options=options) for result in results: @@ -120,3 +120,16 @@ class Uploader(Base): raise Exception('Failed to upload %s' % result["error"]) return remote_paths + + def product_exists(self, remote_dir: os.PathLike) -> bool: + with self.get_service() as swift: + # use container or first part of path + container = self.container or remote_dir.partition('/')[0] + list_parts_gen = swift.list( + container=container, options={"prefix": remote_dir}, + ) + for page in list_parts_gen: + if page["success"] and len(page["listing"]) >= 2: + # at least two files present -> pass validation + return True + return False