From cd17f4117c3b16668d66532d72e86351a4ebb6ce Mon Sep 17 00:00:00 2001 From: Fabian Schindler <fabian.schindler.strauss@gmail.com> Date: Tue, 25 Aug 2020 11:00:06 +0200 Subject: [PATCH] Adding timer class, cleanup --- preprocessor/preprocessor/preprocess.py | 126 +++++++++++++++++------- 1 file changed, 89 insertions(+), 37 deletions(-) diff --git a/preprocessor/preprocessor/preprocess.py b/preprocessor/preprocessor/preprocess.py index 29ecdb56..2c167570 100644 --- a/preprocessor/preprocessor/preprocess.py +++ b/preprocessor/preprocessor/preprocess.py @@ -8,6 +8,7 @@ import shutil from typing import List from contextlib import contextmanager from pprint import pformat +from time import time from .transfer import get_downloader, get_uploader from .archive import unpack_files @@ -45,15 +46,6 @@ def custom_preprocessor(source_dir, target_dir, path, args=None, kwargs=None): func(source_dir, target_dir, *(args or []), **(kwargs or {})) -def georeference(source_dir: os.PathLike, target_dir: os.PathLike, type: str, **options: dict): - """ Apply georeferencing to the files in source_dir to create georeferenced - image files in target_dir. - Georeferencing type specifies what geoereferencing shall be applied. Options - are 'GCP', 'RPC', 'world', ... - """ - # TODO: implement - pass - def custom_postprocessor(source_dir, target_dir, path, args=None, kwargs=None): """ Preprocessing step for a custom preprocessing. """ @@ -77,6 +69,25 @@ def flatten(l): return [item for sublist in l for item in sublist] +class Timer: + """ Helper timer class to allow logging of timing values + """ + def __init__(self): + self.start = None + self.end = None + + def __enter__(self): + self.start = time() + return self + + def __exit__(self, *args, **kwargs): + self.end = time() + + @property + def elapsed(self): + return (self.end if self.end is not None else time()) - self.start + + # ----------------------------------------------------------------------------- @contextmanager @@ -86,8 +97,7 @@ def workdir(config, use_dir: os.PathLike=None): if use_dir: os.chdir(use_dir) yield use_dir - - if config.get('keep_temp'): + elif config.get('keep_temp'): dirname = mkdtemp(prefix=prefix, dir=workdir) os.chdir(dirname) yield dirname @@ -100,7 +110,7 @@ def workdir(config, use_dir: os.PathLike=None): def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=None): """ Runs the preprocessing of a single file. """ - with workdir(config, use_dir) as dirname: + with workdir(config, use_dir) as dirname, Timer() as preprocess_timer: logger.info('Preprocessing %s in %s' % (file_path, dirname)) # check if we can reuse a previous download @@ -112,13 +122,21 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N downloader = get_downloader( source_config['type'], source_config.get('args'), source_config.get('kwargs') ) - source_archive_path = downloader.download(file_path, 'download') + + with Timer() as download_timer: + source_archive_path = downloader.download(file_path, 'download') + + logger.info( + 'Downloaded file %s in %f seconds' + % (file_path, download_timer.elapsed) + ) + else: source_archive_path = os.path.join('download', os.path.basename(file_path)) logger.info('Download dir already exists, skipping...') # fetch the metadata XML file from the downloaded archive - metadata_files = unpack_files(source_archive_path, 'unpack', glob=config['metadata_glob']) + metadata_files = unpack_files(source_archive_path, 'extra', glob=config['metadata_glob']) # open the XML to retrieve the product type and level product_type, product_level = extract_product_type_and_level(metadata_files, config) @@ -135,30 +153,37 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N os.mkdir('unpack') logger.info('Unpacking original files...') # select and unpack files according to configuration - data_files = flatten([ - unpack_files( - source_archive_path, - 'unpack', - glob=glob, - recursive=preprocess_config.get('nested', False), - ) - for glob in preprocess_config['data_file_globs'] - ]) - metadata_files = flatten([ - unpack_files( - source_archive_path, - 'unpack', - glob=glob, - recursive=preprocess_config.get('nested', False), - ) - for glob in preprocess_config['additional_file_globs'] - ]) - logger.info('Unpacked files: %s' % ', '.join(metadata_files, data_files)) + with Timer() as unpack_timer: + data_files = flatten([ + unpack_files( + source_archive_path, + 'unpack', + glob=glob, + recursive=preprocess_config.get('nested', False), + ) + for glob in preprocess_config['data_file_globs'] + ]) + metadata_files = flatten([ + unpack_files( + source_archive_path, + 'unpack', + glob=glob, + recursive=preprocess_config.get('nested', False), + ) + for glob in preprocess_config.get('additional_file_globs', []) + ]) + + logger.info( + 'Unpacked files: %s in %f seconds' + % (', '.join(metadata_files + data_files), unpack_timer.elapsed) + ) else: logger.info('Unpack dir already exists, skipping...') previous_step = 'unpack' + + force_refresh = False # make processing steps for step in ['custom_preprocessor', 'subdatasets', 'georeference', 'calc', 'stack_bands', 'output', 'custom_postprocessor']: step_config = preprocess_config.get(step) @@ -167,14 +192,26 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N continue # run the step if it was not already run - if not os.path.isdir(step): + if not os.path.isdir(step) or force_refresh: + if os.path.isdir(step): + logger.info('Forcing refresh of existing directory %s' % step) + shutil.rmtree(step) + logger.info('Running preprocessing step %s' % step) os.mkdir(step) preprpocessor = STEP_FUNCTIONS[step] - preprpocessor(previous_step, step, **step_config) + + with Timer() as step_timer: + preprpocessor(previous_step, step, **step_config) + + logger.info( + 'Finished preprocessing step %s after %f seconds.' + % (step, step_timer.elapsed) + ) else: - logger.info('%s dir already exists, skipping...' % step) + logger.info('%s dir already exists, skipping step...' % step) + force_refresh = True previous_step = step @@ -197,4 +234,19 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N ] # send all files in the upload directory to the target storage - uploader.upload(upload_filenames, file_path) + logger.info( + 'Starting uploading of %d files to %s' + % (len(upload_filenames), file_path) + ) + with Timer() as upload_timer: + uploader.upload(upload_filenames, file_path) + + logger.info( + 'Finished uploading after %f seconds.' + % (upload_timer.elapsed) + ) + + logger.info( + 'Finished preprocessing of %s after %f seconds.' + % (file_path, preprocess_timer.elapsed) + ) -- GitLab