EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit f6197ceb authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Adding config validation

parent 2347c1c5
No related branches found
No related tags found
No related merge requests found
......@@ -40,7 +40,7 @@ USER root
RUN apt update && \
apt install -y \
python3-redis python3-keystoneclient python3-swiftclient python3-click python3-setuptools wait-for-it && \
python3-redis python3-keystoneclient python3-swiftclient python3-click python3-setuptools python3-jsonschema wait-for-it && \
apt autoremove -y && \
apt clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
......
from os.path import join, dirname
import logging.config
import click
import yaml
import logging.config
import jsonschema
from .preprocess import preprocess_file
......@@ -28,6 +31,13 @@ def setup_logging(debug=False):
})
def validate_config(config):
with open(join(dirname(__file__), 'config-schema.yaml')) as f:
schema = yaml.load(f)
jsonschema.validate(config, schema)
@click.group()
def cli():
setup_logging(True)
......@@ -42,8 +52,11 @@ def daemon(ctx):
@click.argument('file_path', type=str)
@click.option('--config-file', type=click.File('r'))
@click.option('--use-dir', type=str) # TODO: check dir
def preprocess(file_path, config_file=None, use_dir=None):
@click.option('--validate/--no-validate', default=False)
def preprocess(file_path, config_file=None, use_dir=None, validate=False):
config = yaml.load(config_file)
if validate:
validate_config(config)
preprocess_file(config, file_path, use_dir)
......
$id: https://example.com/address.schema.json
$schema: http://json-schema.org/draft-07/schema#
type: object
properties:
source:
description: File source description. Either a local file system or an object storage.
type: object
properties:
type:
description: The type of the file source.
type: string
enum: [local, swift]
kwargs:
description: Extra arguments. Use depends on actual implementation.
type: object
# required: ['type']
target:
description: File target description. Either a local file system or an object storage.
type: object
properties:
type:
description: The type of the file target.
type: string
enum: [local, swift]
kwargs:
description: Extra arguments. Use depends on actual implementation.
type: object
# required: [type]
workdir:
description: The local directory, where intermediary files are to be stored.
type: string
keep_temp:
description: Whether to keep temporary files for each step. DEPRECATED.
type: boolean
metadata_glob:
description: A file glob to select metadata files from the downloaded archive.
type: string
type_extractor:
description: How the product type is to be extracted from the metadata file.
type: object
properties:
xpath:
description: Either a single XPath or multiple XPaths to the product type in the metadata file. Each is tried consecutively until the type could be extracted.
oneOf:
- type: string
- type: array
items:
type: string
map:
description: A simple mapping of the extracted type value to an identifier for later usage. This is useful when a preprocessing chain can be re-used for multiple product types.
type: object
required: [xpath]
level_extractor:
description: How the product level is extracted. Currently unused.
type: object
# TODO
preprocessing:
description: The actual preprocessing definition.
type: object
properties:
defaults:
description: The default step settings to be applied.
$ref: '#/$defs/steps'
types:
description: Product type specific step config.
type: object
additionalProperties:
description: A mapping of product type -> steps configuration
$ref: '#/$defs/steps'
required: [types]
required:
- source
- target
- workdir
- keep_temp
- metadata_glob
- type_extractor
- level_extractor
- preprocessing
$defs:
steps:
custom_preprocessor:
description: Definition of a custom preprocessor step
type: object
properties:
path:
description: "The python dotted path to the function to invoke. e.g: 'path.to.module.function'"
type: string
args:
description: The list of arguments to pass to that function
type: array
kwargs:
description: The map of keyword arguments to pass to that function.
type: object
subdatasets:
description: The definition of the subdataset extraction step.
type: object
properties:
data_file_glob:
description: The data file selector.
type: string
subdataset_types:
description: List of subdatasets to be extracted for each data file.
type: array
items:
type: string
georeference:
description: The definition of a georeferencing step.
type: object
properties:
type:
description: The type of georeferencing to apply.
type: string
enum: [gcp, rpc, world] # TODO: more
options:
description: Additional options for the georeferencing. Depends on the type of georeferencing.
type: object
properties:
order:
description: The polynomial order to use for GCP reprojection.
type: number
projection:
description: The projection to use for ungeoreferenced images.
type: string
rpc_file_template:
description: The file glob template to use to find the RPC file. Template parameters are {filename}, {fileroot}, and {extension}.
type: string
warp_options:
description: "Warp options. See https://gdal.org/python/osgeo.gdal-module.html#WarpOptions for details"
corner_names:
description:
required: [type]
stack_bands:
description: Definition of a custom preprocessor step
type: object
properties:
group_by:
description: A regex to group the input datasets, if consisting of multiple file. The first regex group is used for the grouping.
type: string
sort_by:
description: A regex to select a portion of the filename to be used for sorting. The first regex group is used.
type: string
order:
description: The order of the extracted item used in 'sort_by'.
type: array
items:
type: string
output:
description: Definition of a custom preprocessor step
type: object
properties:
options:
description: "Options to be passed to `gdal.Warp`. See https://gdal.org/python/osgeo.gdal-module.html#WarpOptions for details"
type: object
custom_postprocessor:
description: Definition of a custom postprocessor step
type: object
properties:
path:
description: "The python dotted path to the function to invoke. e.g: 'path.to.module.function'"
type: string
args:
description: The list of arguments to pass to that function
type: array
kwargs:
description: The map of keyword arguments to pass to that function.
type: object
......@@ -7,14 +7,14 @@ import logging
import shutil
from typing import List
from contextlib import contextmanager
from pprint import pformat
from .transfer import get_downloader, get_uploader
from .archive import unpack_files
from .metadata import extract_product_type_and_level
from .georeference import georeference_step
from .subdataset import extract_subdataset_step
from .stack import stack_bands_step
from .steps import (
georeference_step, extract_subdataset_step, stack_bands_step, output_step
)
logging.basicConfig()
......@@ -71,18 +71,6 @@ STEP_FUNCTIONS = {
'custom_postprocessor': custom_postprocessor,
}
# def simple_copy(source_dir, target_dir, *args, **kwargs):
# copy_files(source_dir, target_dir)
# STEP_FUNCTIONS = {
# 'custom_preprocessor': simple_copy,
# 'georeference': simple_copy,
# 'stack_bands': simple_copy,
# 'output': simple_copy,
# 'custom_postprocessor': simple_copy,
# }
def flatten(l):
return [item for sublist in l for item in sublist]
......@@ -117,6 +105,7 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
# check if we can reuse a previous download
if not os.path.isdir('download'):
os.mkdir('download')
logger.info('Downloading %s from %s...' % (file_path, dirname))
# get the Downloader for the configured source archive to download the given source file
source_config = config['source']
downloader = get_downloader(
......@@ -125,24 +114,27 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
source_archive_path = downloader.download(file_path, 'download')
else:
source_archive_path = os.path.join('download', os.path.basename(file_path))
print('Download dir already exists, skipping...')
logger.info('Download dir already exists, skipping...')
# fetch the metadata XML file from the downloaded archive
metadata_files = unpack_files(source_archive_path, 'unpack', glob=config['metadata_glob'])
# open the XML to retrieve the product type and level
product_type, product_level = extract_product_type_and_level(metadata_files, config)
print('Detected %s/%s' % (product_type, product_level))
logger.info('Detected %s/%s' % (product_type, product_level))
# get a concrete configuration for the type, filled with the defaults
default_config = dict(config['preprocessing'].get('defaults', {}))
preprocess_config = dict(config['preprocessing']['types'].get(product_type, {}))
default_config.update(preprocess_config)
preprocess_config.update(default_config)
logger.debug('Using preprocessing config %s' % pformat(preprocess_config))
if not os.path.isdir('unpack'):
os.mkdir('unpack')
logger.info('Unpacking original files...')
# select and unpack files according to configuration
flatten([
data_files = flatten([
unpack_files(
source_archive_path,
'unpack',
......@@ -151,7 +143,7 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
)
for glob in preprocess_config['data_file_globs']
])
flatten([
metadata_files = flatten([
unpack_files(
source_archive_path,
'unpack',
......@@ -160,30 +152,36 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
)
for glob in preprocess_config['additional_file_globs']
])
logger.info('Unpacked files: %s' % ', '.join(metadata_files, data_files))
else:
print('Unpack dir already exists, skipping...')
logger.info('Unpack dir already exists, skipping...')
previous_step = 'unpack'
# make processing steps
for step in ['custom_preprocessor', 'georeference', 'stack_bands', 'output', 'custom_postprocessor']:
for step in ['custom_preprocessor', 'subdatasets', 'georeference', 'stack_bands', 'output', 'custom_postprocessor']:
step_config = preprocess_config.get(step)
if not step_config:
logger.debug('Skipping step %s as it is not configured.' % step)
continue
# run the step if it was not already run
if not os.path.isdir(step):
logger.info('Running preprocessing step %s' % step)
os.mkdir(step)
preprpocessor = STEP_FUNCTIONS[step]
preprpocessor(previous_step, step, **step_config)
previous_step = step
else:
print('%s dir already exists, skipping...' % step)
logger.info('%s dir already exists, skipping...' % step)
previous_step = step
os.mkdir('upload')
if not os.path.isdir('upload'):
os.mkdir('upload')
# copy files from previous step directory to upload directory
copy_files(previous_step, 'upload')
# copy files from previous step directory to upload directory
copy_files(previous_step, 'upload')
# get an uploader for the finalized images
target_config = config['target']
......
from .georeference import georeference_step
from .output import output_step
from .stack import stack_bands_step
from .subdataset import extract_subdataset_step
__all__ = [
'georeference_step',
'output_step',
'stack_bands_step',
'extract_subdataset_step',
]
......@@ -5,7 +5,7 @@ from uuid import uuid4
from osgeo import gdal
from .util import replace_ext
from ..util import replace_ext
def output_step(source_dir: os.PathLike, target_dir: os.PathLike, options: dict=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment