EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit a71cd059 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

First implementation of browse report preprocessing

parent bde0409e
No related branches found
No related tags found
No related merge requests found
from os.path import join, dirname from os.path import join, dirname
import logging.config import logging.config
import json
import click import click
import yaml import yaml
import jsonschema import jsonschema
from .preprocess import preprocess_file from .preprocess import preprocess_file, preprocess_browse
from .daemon import run_daemon from .daemon import run_daemon
...@@ -64,11 +65,21 @@ def daemon(config_file=None, use_dir=None, validate=False, host=None, port=None, ...@@ -64,11 +65,21 @@ def daemon(config_file=None, use_dir=None, validate=False, host=None, port=None,
@click.option('--config-file', type=click.File('r')) @click.option('--config-file', type=click.File('r'))
@click.option('--use-dir', type=str) # TODO: check dir @click.option('--use-dir', type=str) # TODO: check dir
@click.option('--validate/--no-validate', default=False) @click.option('--validate/--no-validate', default=False)
def preprocess(file_path, config_file=None, use_dir=None, validate=False): @click.option('--browse-report/--no-browse-report', default=False)
def preprocess(file_path, config_file=None, use_dir=None, validate=False, browse_report=False):
config = yaml.load(config_file) config = yaml.load(config_file)
if validate: if validate:
validate_config(config) validate_config(config)
preprocess_file(config, file_path, use_dir)
if browse_report:
with open(file_path) as f:
browse_report_data = json.load(f)
browse_type = browse_report_data['browse_type']
for browse in browse_report_data['browses']:
preprocess_browse(config, browse_type, browse, use_dir)
else:
preprocess_file(config, file_path, use_dir)
if __name__ == '__main__': if __name__ == '__main__':
......
import os import os
import os.path import os.path
from tempfile import TemporaryDirectory, mkdtemp
import itertools import itertools
import importlib import importlib
import logging import logging
import shutil import shutil
from typing import List from typing import List
from contextlib import contextmanager
from pprint import pformat from pprint import pformat
from time import time from time import time
from urllib.parse import urlparse
from .transfer import get_downloader, get_uploader from .transfer import get_downloader, get_uploader
from .archive import unpack_files from .archive import unpack_files
...@@ -16,6 +15,8 @@ from .metadata import extract_product_type_and_level ...@@ -16,6 +15,8 @@ from .metadata import extract_product_type_and_level
from .steps import ( from .steps import (
georeference_step, extract_subdataset_step, calc_step, stack_bands_step, output_step georeference_step, extract_subdataset_step, calc_step, stack_bands_step, output_step
) )
from .steps.browse_report import browse_georeference
from .util import workdir
logging.basicConfig() logging.basicConfig()
...@@ -90,21 +91,47 @@ class Timer: ...@@ -90,21 +91,47 @@ class Timer:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@contextmanager
def workdir(config, use_dir: os.PathLike=None): def preprocess_internal(preprocess_config, previous_step='unpack'):
prefix = config.get('prefix', 'preprocess_') force_refresh = False
workdir = config.get('workdir') # make processing steps
if use_dir: for step in ['custom_preprocessor', 'subdatasets', 'georeference', 'calc', 'stack_bands', 'output', 'custom_postprocessor']:
os.chdir(use_dir) step_config = preprocess_config.get(step)
yield use_dir if not step_config:
elif config.get('keep_temp'): logger.debug('Skipping step %s as it is not configured.' % step)
dirname = mkdtemp(prefix=prefix, dir=workdir) continue
os.chdir(dirname)
yield dirname # run the step if it was not already run
else: if not os.path.isdir(step) or force_refresh:
with TemporaryDirectory(prefix=prefix, dir=workdir) as dirname: if os.path.isdir(step):
os.chdir(dirname) logger.info('Forcing refresh of existing directory %s' % step)
yield dirname shutil.rmtree(step)
logger.info('Running preprocessing step %s' % step)
os.mkdir(step)
preprocessor = STEP_FUNCTIONS[step]
with Timer() as step_timer:
preprocessor(previous_step, step, **step_config)
logger.info(
'Finished preprocessing step %s after %f seconds.'
% (step, step_timer.elapsed)
)
force_refresh = True
else:
logger.info('%s dir already exists, skipping step...' % step)
previous_step = step
if not os.path.isdir('upload'):
os.mkdir('upload')
# copy files from previous step directory to upload directory
copy_files(previous_step, 'upload')
def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=None): def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=None):
...@@ -181,45 +208,8 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N ...@@ -181,45 +208,8 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
else: else:
logger.info('Unpack dir already exists, skipping...') logger.info('Unpack dir already exists, skipping...')
previous_step = 'unpack' # actually perform the preprocessing from the downloaded file
preprocess_internal(preprocess_config, 'unpack')
force_refresh = False
# make processing steps
for step in ['custom_preprocessor', 'subdatasets', 'georeference', 'calc', 'stack_bands', 'output', 'custom_postprocessor']:
step_config = preprocess_config.get(step)
if not step_config:
logger.debug('Skipping step %s as it is not configured.' % step)
continue
# run the step if it was not already run
if not os.path.isdir(step) or force_refresh:
if os.path.isdir(step):
logger.info('Forcing refresh of existing directory %s' % step)
shutil.rmtree(step)
logger.info('Running preprocessing step %s' % step)
os.mkdir(step)
preprocessor = STEP_FUNCTIONS[step]
with Timer() as step_timer:
preprocessor(previous_step, step, **step_config)
logger.info(
'Finished preprocessing step %s after %f seconds.'
% (step, step_timer.elapsed)
)
else:
logger.info('%s dir already exists, skipping step...' % step)
force_refresh = True
previous_step = step
if not os.path.isdir('upload'):
os.mkdir('upload')
# copy files from previous step directory to upload directory
copy_files(previous_step, 'upload')
# get an uploader for the finalized images # get an uploader for the finalized images
target_config = config['target'] target_config = config['target']
...@@ -252,3 +242,66 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N ...@@ -252,3 +242,66 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
) )
return upload_filenames return upload_filenames
def preprocess_browse(config: dict, browse_type: str, browse: dict, use_dir: os.PathLike=None):
with workdir(config, use_dir) as dirname, Timer() as preprocess_timer:
filename = browse['filename']
logger.info('Preprocessing browse "%s" in %s' % (filename, dirname))
parsed = urlparse(filename)
print(filename)
print(parsed)
if not parsed.scheme:
# check if we can reuse a previous download
if not os.path.isdir('download'):
os.mkdir('download')
logger.info('Downloading %s from %s...' % (filename, dirname))
# get the Downloader for the configured source archive to download the given source file
source_config = config['source']
downloader = get_downloader(
source_config['type'], source_config.get('args'), source_config.get('kwargs')
)
with Timer() as download_timer:
source_filename_path = downloader.download(filename, 'download')
logger.info(
'Downloaded file %s in %f seconds'
% (filename, download_timer.elapsed)
)
else:
source_filename_path = os.path.join('download', os.path.basename(filename))
logger.info('Download dir already exists, skipping...')
elif parsed.scheme in ('http', 'https'):
# TODO: check if allowed and download from there
raise NotImplementedError
os.mkdir('unpack')
logger.info('Applying browse georeference to browse %s' % filename)
browse_georeference('download', 'unpack', browse)
# fetch the product type from the browse_type
product_type = config.get('browse_type_mapping', {}).get(browse_type, browse_type)
logger.info('Detected %s' % (product_type))
# get a concrete configuration for the type, filled with the defaults
default_config = dict(config['preprocessing'].get('defaults', {}))
preprocess_config = dict(config['preprocessing']['types'].get(product_type, {}))
default_config.update(preprocess_config)
logger.debug('Using preprocessing config %s' % pformat(preprocess_config))
preprocess_internal(preprocess_config)
logger.info(
'Finished preprocessing of browse "%s" after %f seconds.'
% (filename, preprocess_timer.elapsed)
)
import os import os
from os.path import splitext from os.path import splitext
from contextlib import contextmanager
from tempfile import TemporaryDirectory, mkdtemp
def replace_ext(filename: os.PathLike, new_ext: str, force_dot: bool=True) -> os.PathLike: def replace_ext(filename: os.PathLike, new_ext: str, force_dot: bool=True) -> os.PathLike:
return splitext(filename)[0] + ('' if new_ext.startswith('.') or not force_dot else '.') + new_ext return splitext(filename)[0] + ('' if new_ext.startswith('.') or not force_dot else '.') + new_ext
@contextmanager
def workdir(config: dict, use_dir: os.PathLike=None):
prefix = config.get('prefix', 'preprocess_')
workdir = config.get('workdir')
if use_dir:
os.chdir(use_dir)
yield use_dir
elif config.get('keep_temp'):
dirname = mkdtemp(prefix=prefix, dir=workdir)
os.chdir(dirname)
yield dirname
else:
with TemporaryDirectory(prefix=prefix, dir=workdir) as dirname:
os.chdir(dirname)
yield dirname
def pairwise(col):
iterator = iter(col)
while True:
try:
yield (next(iterator), next(iterator))
except StopIteration:
break
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment