EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit cd17f411 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Adding timer class, cleanup

parent 3303a85f
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,7 @@ import shutil ...@@ -8,6 +8,7 @@ import shutil
from typing import List from typing import List
from contextlib import contextmanager from contextlib import contextmanager
from pprint import pformat from pprint import pformat
from time import time
from .transfer import get_downloader, get_uploader from .transfer import get_downloader, get_uploader
from .archive import unpack_files from .archive import unpack_files
...@@ -45,15 +46,6 @@ def custom_preprocessor(source_dir, target_dir, path, args=None, kwargs=None): ...@@ -45,15 +46,6 @@ def custom_preprocessor(source_dir, target_dir, path, args=None, kwargs=None):
func(source_dir, target_dir, *(args or []), **(kwargs or {})) func(source_dir, target_dir, *(args or []), **(kwargs or {}))
def georeference(source_dir: os.PathLike, target_dir: os.PathLike, type: str, **options: dict):
""" Apply georeferencing to the files in source_dir to create georeferenced
image files in target_dir.
Georeferencing type specifies what geoereferencing shall be applied. Options
are 'GCP', 'RPC', 'world', ...
"""
# TODO: implement
pass
def custom_postprocessor(source_dir, target_dir, path, args=None, kwargs=None): def custom_postprocessor(source_dir, target_dir, path, args=None, kwargs=None):
""" Preprocessing step for a custom preprocessing. """ Preprocessing step for a custom preprocessing.
""" """
...@@ -77,6 +69,25 @@ def flatten(l): ...@@ -77,6 +69,25 @@ def flatten(l):
return [item for sublist in l for item in sublist] return [item for sublist in l for item in sublist]
class Timer:
""" Helper timer class to allow logging of timing values
"""
def __init__(self):
self.start = None
self.end = None
def __enter__(self):
self.start = time()
return self
def __exit__(self, *args, **kwargs):
self.end = time()
@property
def elapsed(self):
return (self.end if self.end is not None else time()) - self.start
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@contextmanager @contextmanager
...@@ -86,8 +97,7 @@ def workdir(config, use_dir: os.PathLike=None): ...@@ -86,8 +97,7 @@ def workdir(config, use_dir: os.PathLike=None):
if use_dir: if use_dir:
os.chdir(use_dir) os.chdir(use_dir)
yield use_dir yield use_dir
elif config.get('keep_temp'):
if config.get('keep_temp'):
dirname = mkdtemp(prefix=prefix, dir=workdir) dirname = mkdtemp(prefix=prefix, dir=workdir)
os.chdir(dirname) os.chdir(dirname)
yield dirname yield dirname
...@@ -100,7 +110,7 @@ def workdir(config, use_dir: os.PathLike=None): ...@@ -100,7 +110,7 @@ def workdir(config, use_dir: os.PathLike=None):
def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=None): def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=None):
""" Runs the preprocessing of a single file. """ Runs the preprocessing of a single file.
""" """
with workdir(config, use_dir) as dirname: with workdir(config, use_dir) as dirname, Timer() as preprocess_timer:
logger.info('Preprocessing %s in %s' % (file_path, dirname)) logger.info('Preprocessing %s in %s' % (file_path, dirname))
# check if we can reuse a previous download # check if we can reuse a previous download
...@@ -112,13 +122,21 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N ...@@ -112,13 +122,21 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
downloader = get_downloader( downloader = get_downloader(
source_config['type'], source_config.get('args'), source_config.get('kwargs') source_config['type'], source_config.get('args'), source_config.get('kwargs')
) )
source_archive_path = downloader.download(file_path, 'download')
with Timer() as download_timer:
source_archive_path = downloader.download(file_path, 'download')
logger.info(
'Downloaded file %s in %f seconds'
% (file_path, download_timer.elapsed)
)
else: else:
source_archive_path = os.path.join('download', os.path.basename(file_path)) source_archive_path = os.path.join('download', os.path.basename(file_path))
logger.info('Download dir already exists, skipping...') logger.info('Download dir already exists, skipping...')
# fetch the metadata XML file from the downloaded archive # fetch the metadata XML file from the downloaded archive
metadata_files = unpack_files(source_archive_path, 'unpack', glob=config['metadata_glob']) metadata_files = unpack_files(source_archive_path, 'extra', glob=config['metadata_glob'])
# open the XML to retrieve the product type and level # open the XML to retrieve the product type and level
product_type, product_level = extract_product_type_and_level(metadata_files, config) product_type, product_level = extract_product_type_and_level(metadata_files, config)
...@@ -135,30 +153,37 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N ...@@ -135,30 +153,37 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
os.mkdir('unpack') os.mkdir('unpack')
logger.info('Unpacking original files...') logger.info('Unpacking original files...')
# select and unpack files according to configuration # select and unpack files according to configuration
data_files = flatten([
unpack_files(
source_archive_path,
'unpack',
glob=glob,
recursive=preprocess_config.get('nested', False),
)
for glob in preprocess_config['data_file_globs']
])
metadata_files = flatten([
unpack_files(
source_archive_path,
'unpack',
glob=glob,
recursive=preprocess_config.get('nested', False),
)
for glob in preprocess_config['additional_file_globs']
])
logger.info('Unpacked files: %s' % ', '.join(metadata_files, data_files)) with Timer() as unpack_timer:
data_files = flatten([
unpack_files(
source_archive_path,
'unpack',
glob=glob,
recursive=preprocess_config.get('nested', False),
)
for glob in preprocess_config['data_file_globs']
])
metadata_files = flatten([
unpack_files(
source_archive_path,
'unpack',
glob=glob,
recursive=preprocess_config.get('nested', False),
)
for glob in preprocess_config.get('additional_file_globs', [])
])
logger.info(
'Unpacked files: %s in %f seconds'
% (', '.join(metadata_files + data_files), unpack_timer.elapsed)
)
else: else:
logger.info('Unpack dir already exists, skipping...') logger.info('Unpack dir already exists, skipping...')
previous_step = 'unpack' previous_step = 'unpack'
force_refresh = False
# make processing steps # make processing steps
for step in ['custom_preprocessor', 'subdatasets', 'georeference', 'calc', 'stack_bands', 'output', 'custom_postprocessor']: for step in ['custom_preprocessor', 'subdatasets', 'georeference', 'calc', 'stack_bands', 'output', 'custom_postprocessor']:
step_config = preprocess_config.get(step) step_config = preprocess_config.get(step)
...@@ -167,14 +192,26 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N ...@@ -167,14 +192,26 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
continue continue
# run the step if it was not already run # run the step if it was not already run
if not os.path.isdir(step): if not os.path.isdir(step) or force_refresh:
if os.path.isdir(step):
logger.info('Forcing refresh of existing directory %s' % step)
shutil.rmtree(step)
logger.info('Running preprocessing step %s' % step) logger.info('Running preprocessing step %s' % step)
os.mkdir(step) os.mkdir(step)
preprpocessor = STEP_FUNCTIONS[step] preprpocessor = STEP_FUNCTIONS[step]
preprpocessor(previous_step, step, **step_config)
with Timer() as step_timer:
preprpocessor(previous_step, step, **step_config)
logger.info(
'Finished preprocessing step %s after %f seconds.'
% (step, step_timer.elapsed)
)
else: else:
logger.info('%s dir already exists, skipping...' % step) logger.info('%s dir already exists, skipping step...' % step)
force_refresh = True
previous_step = step previous_step = step
...@@ -197,4 +234,19 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N ...@@ -197,4 +234,19 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
] ]
# send all files in the upload directory to the target storage # send all files in the upload directory to the target storage
uploader.upload(upload_filenames, file_path) logger.info(
'Starting uploading of %d files to %s'
% (len(upload_filenames), file_path)
)
with Timer() as upload_timer:
uploader.upload(upload_filenames, file_path)
logger.info(
'Finished uploading after %f seconds.'
% (upload_timer.elapsed)
)
logger.info(
'Finished preprocessing of %s after %f seconds.'
% (file_path, preprocess_timer.elapsed)
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment