From f6197ceb31d44bf11096568d6cfb93206fad10fb Mon Sep 17 00:00:00 2001
From: Fabian Schindler <fabian.schindler.strauss@gmail.com>
Date: Wed, 3 Jun 2020 16:25:26 +0200
Subject: [PATCH] Adding config validation

---
 preprocessor/Dockerfile                      |   2 +-
 preprocessor/preprocessor/cli.py             |  17 +-
 preprocessor/preprocessor/config-schema.yaml | 167 +++++++++++++++++++
 preprocessor/preprocessor/preprocess.py      |  54 +++---
 preprocessor/preprocessor/steps/__init__.py  |  12 ++
 preprocessor/preprocessor/steps/output.py    |   2 +-
 6 files changed, 222 insertions(+), 32 deletions(-)
 create mode 100644 preprocessor/preprocessor/config-schema.yaml

diff --git a/preprocessor/Dockerfile b/preprocessor/Dockerfile
index 7ba2fe18..4ac2310a 100644
--- a/preprocessor/Dockerfile
+++ b/preprocessor/Dockerfile
@@ -40,7 +40,7 @@ USER root
 
 RUN apt update && \
     apt install -y \
-        python3-redis python3-keystoneclient python3-swiftclient python3-click python3-setuptools wait-for-it && \
+        python3-redis python3-keystoneclient python3-swiftclient python3-click python3-setuptools python3-jsonschema wait-for-it && \
     apt autoremove -y && \
     apt clean && \
     rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
diff --git a/preprocessor/preprocessor/cli.py b/preprocessor/preprocessor/cli.py
index baf9933a..db8896fd 100644
--- a/preprocessor/preprocessor/cli.py
+++ b/preprocessor/preprocessor/cli.py
@@ -1,6 +1,9 @@
+from os.path import join, dirname
+import logging.config
+
 import click
 import yaml
-import logging.config
+import jsonschema
 
 from .preprocess import preprocess_file
 
@@ -28,6 +31,13 @@ def setup_logging(debug=False):
     })
 
 
+def validate_config(config):
+    with open(join(dirname(__file__), 'config-schema.yaml')) as f:
+        schema = yaml.load(f)
+
+    jsonschema.validate(config, schema)
+
+
 @click.group()
 def cli():
     setup_logging(True)
@@ -42,8 +52,11 @@ def daemon(ctx):
 @click.argument('file_path', type=str)
 @click.option('--config-file', type=click.File('r'))
 @click.option('--use-dir', type=str) # TODO: check dir
-def preprocess(file_path, config_file=None, use_dir=None):
+@click.option('--validate/--no-validate', default=False)
+def preprocess(file_path, config_file=None, use_dir=None, validate=False):
     config = yaml.load(config_file)
+    if validate:
+        validate_config(config)
     preprocess_file(config, file_path, use_dir)
 
 
diff --git a/preprocessor/preprocessor/config-schema.yaml b/preprocessor/preprocessor/config-schema.yaml
new file mode 100644
index 00000000..8c26ab81
--- /dev/null
+++ b/preprocessor/preprocessor/config-schema.yaml
@@ -0,0 +1,167 @@
+$id: https://example.com/address.schema.json
+$schema: http://json-schema.org/draft-07/schema#
+type: object
+properties:
+  source:
+    description: File source description. Either a local file system or an object storage.
+    type: object
+    properties:
+      type:
+        description: The type of the file source.
+        type: string
+        enum: [local, swift]
+      kwargs:
+        description: Extra arguments. Use depends on actual implementation.
+        type: object
+      # required: ['type']
+  target:
+    description: File target description. Either a local file system or an object storage.
+    type: object
+    properties:
+      type:
+        description: The type of the file target.
+        type: string
+        enum: [local, swift]
+      kwargs:
+        description: Extra arguments. Use depends on actual implementation.
+        type: object
+      # required: [type]
+  workdir:
+    description: The local directory, where intermediary files are to be stored.
+    type: string
+  keep_temp:
+    description: Whether to keep temporary files for each step. DEPRECATED.
+    type: boolean
+  metadata_glob:
+    description: A file glob to select metadata files from the downloaded archive.
+    type: string
+  type_extractor:
+    description: How the product type is to be extracted from the metadata file.
+    type: object
+    properties:
+      xpath:
+        description: Either a single XPath or multiple XPaths to the product type in the metadata file. Each is tried consecutively until the type could be extracted.
+        oneOf:
+          - type: string
+          - type: array
+            items:
+              type: string
+      map:
+        description: A simple mapping of the extracted type value to an identifier for later usage. This is useful when a preprocessing chain can be re-used for multiple product types.
+        type: object
+    required: [xpath]
+  level_extractor:
+    description: How the product level is extracted. Currently unused.
+    type: object
+    # TODO
+  preprocessing:
+    description: The actual preprocessing definition.
+    type: object
+    properties:
+      defaults:
+        description: The default step settings to be applied.
+        $ref: '#/$defs/steps'
+      types:
+        description: Product type specific step config.
+        type: object
+        additionalProperties:
+          description: A mapping of product type -> steps configuration
+          $ref: '#/$defs/steps'
+    required: [types]
+required:
+  - source
+  - target
+  - workdir
+  - keep_temp
+  - metadata_glob
+  - type_extractor
+  - level_extractor
+  - preprocessing
+$defs:
+  steps:
+    custom_preprocessor:
+      description: Definition of a custom preprocessor step
+      type: object
+      properties:
+        path:
+          description: "The python dotted path to the function to invoke. e.g: 'path.to.module.function'"
+          type: string
+        args:
+          description: The list of arguments to pass to that function
+          type: array
+        kwargs:
+          description: The map of keyword arguments to pass to that function.
+          type: object
+    subdatasets:
+      description: The definition of the subdataset extraction step.
+      type: object
+      properties:
+        data_file_glob:
+          description: The data file selector.
+          type: string
+        subdataset_types:
+          description: List of subdatasets to be extracted for each data file.
+          type: array
+          items:
+            type: string
+    georeference:
+      description: The definition of a georeferencing step.
+      type: object
+      properties:
+        type:
+          description: The type of georeferencing to apply.
+          type: string
+          enum: [gcp, rpc, world] # TODO: more
+        options:
+          description: Additional options for the georeferencing. Depends on the type of georeferencing.
+          type: object
+          properties:
+            order:
+              description: The polynomial order to use for GCP reprojection.
+              type: number
+            projection:
+              description: The projection to use for ungeoreferenced images.
+              type: string
+            rpc_file_template:
+              description: The file glob template to use to find the RPC file. Template parameters are {filename}, {fileroot}, and {extension}.
+              type: string
+            warp_options:
+              description: "Warp options. See https://gdal.org/python/osgeo.gdal-module.html#WarpOptions for details"
+            corner_names:
+              description:
+      required: [type]
+    stack_bands:
+      description: Definition of a custom preprocessor step
+      type: object
+      properties:
+        group_by:
+          description: A regex to group the input datasets, if consisting of multiple file. The first regex group is used for the grouping.
+          type: string
+        sort_by:
+          description: A regex to select a portion of the filename to be used for sorting. The first regex group is used.
+          type: string
+        order:
+          description: The order of the extracted item used in 'sort_by'.
+          type: array
+          items:
+            type: string
+    output:
+      description: Definition of a custom preprocessor step
+      type: object
+      properties:
+        options:
+          description: "Options to be passed to `gdal.Warp`. See https://gdal.org/python/osgeo.gdal-module.html#WarpOptions for details"
+          type: object
+    custom_postprocessor:
+      description: Definition of a custom postprocessor step
+      type: object
+      properties:
+        path:
+          description: "The python dotted path to the function to invoke. e.g: 'path.to.module.function'"
+          type: string
+        args:
+          description: The list of arguments to pass to that function
+          type: array
+        kwargs:
+          description: The map of keyword arguments to pass to that function.
+          type: object
diff --git a/preprocessor/preprocessor/preprocess.py b/preprocessor/preprocessor/preprocess.py
index bf46f27c..57d9f155 100644
--- a/preprocessor/preprocessor/preprocess.py
+++ b/preprocessor/preprocessor/preprocess.py
@@ -7,14 +7,14 @@ import logging
 import shutil
 from typing import List
 from contextlib import contextmanager
+from pprint import pformat
 
 from .transfer import get_downloader, get_uploader
 from .archive import unpack_files
 from .metadata import extract_product_type_and_level
-from .georeference import georeference_step
-from .subdataset import extract_subdataset_step
-from .stack import stack_bands_step
-
+from .steps import (
+    georeference_step, extract_subdataset_step, stack_bands_step, output_step
+)
 
 logging.basicConfig()
 
@@ -71,18 +71,6 @@ STEP_FUNCTIONS = {
     'custom_postprocessor': custom_postprocessor,
 }
 
-# def simple_copy(source_dir, target_dir, *args, **kwargs):
-#     copy_files(source_dir, target_dir)
-
-
-# STEP_FUNCTIONS = {
-#     'custom_preprocessor': simple_copy,
-#     'georeference': simple_copy,
-#     'stack_bands': simple_copy,
-#     'output': simple_copy,
-#     'custom_postprocessor': simple_copy,
-# }
-
 
 def flatten(l):
     return [item for sublist in l for item in sublist]
@@ -117,6 +105,7 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
         # check if we can reuse a previous download
         if not os.path.isdir('download'):
             os.mkdir('download')
+            logger.info('Downloading %s from %s...' % (file_path, dirname))
             # get the Downloader for the configured source archive to download the given source file
             source_config = config['source']
             downloader = get_downloader(
@@ -125,24 +114,27 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
             source_archive_path = downloader.download(file_path, 'download')
         else:
             source_archive_path = os.path.join('download', os.path.basename(file_path))
-            print('Download dir already exists, skipping...')
+            logger.info('Download dir already exists, skipping...')
 
         # fetch the metadata XML file from the downloaded archive
         metadata_files = unpack_files(source_archive_path, 'unpack', glob=config['metadata_glob'])
 
         # open the XML to retrieve the product type and level
         product_type, product_level = extract_product_type_and_level(metadata_files, config)
-        print('Detected %s/%s' % (product_type, product_level))
+        logger.info('Detected %s/%s' % (product_type, product_level))
 
         # get a concrete configuration for the type, filled with the defaults
         default_config = dict(config['preprocessing'].get('defaults', {}))
         preprocess_config = dict(config['preprocessing']['types'].get(product_type, {}))
-        default_config.update(preprocess_config)
+        preprocess_config.update(default_config)
+
+        logger.debug('Using preprocessing config %s' % pformat(preprocess_config))
 
         if not os.path.isdir('unpack'):
             os.mkdir('unpack')
+            logger.info('Unpacking original files...')
             # select and unpack files according to configuration
-            flatten([
+            data_files = flatten([
                 unpack_files(
                     source_archive_path,
                     'unpack',
@@ -151,7 +143,7 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
                 )
                 for glob in preprocess_config['data_file_globs']
             ])
-            flatten([
+            metadata_files = flatten([
                 unpack_files(
                     source_archive_path,
                     'unpack',
@@ -160,30 +152,36 @@ def preprocess_file(config: dict, file_path: os.PathLike, use_dir: os.PathLike=N
                 )
                 for glob in preprocess_config['additional_file_globs']
             ])
+
+            logger.info('Unpacked files: %s' % ', '.join(metadata_files, data_files))
         else:
-            print('Unpack dir already exists, skipping...')
+            logger.info('Unpack dir already exists, skipping...')
 
         previous_step = 'unpack'
         # make processing steps
-        for step in ['custom_preprocessor', 'georeference', 'stack_bands', 'output', 'custom_postprocessor']:
+        for step in ['custom_preprocessor', 'subdatasets', 'georeference', 'stack_bands', 'output', 'custom_postprocessor']:
             step_config = preprocess_config.get(step)
             if not step_config:
+                logger.debug('Skipping step %s as it is not configured.' % step)
                 continue
 
             # run the step if it was not already run
             if not os.path.isdir(step):
+                logger.info('Running preprocessing step %s' % step)
                 os.mkdir(step)
                 preprpocessor = STEP_FUNCTIONS[step]
                 preprpocessor(previous_step, step, **step_config)
 
-                previous_step = step
             else:
-                print('%s dir already exists, skipping...' % step)
+                logger.info('%s dir already exists, skipping...' % step)
+
+            previous_step = step
 
-        os.mkdir('upload')
+        if not os.path.isdir('upload'):
+            os.mkdir('upload')
 
-        # copy files from previous step directory to upload directory
-        copy_files(previous_step, 'upload')
+            # copy files from previous step directory to upload directory
+            copy_files(previous_step, 'upload')
 
         # get an uploader for the finalized images
         target_config = config['target']
diff --git a/preprocessor/preprocessor/steps/__init__.py b/preprocessor/preprocessor/steps/__init__.py
index e69de29b..3f566c57 100644
--- a/preprocessor/preprocessor/steps/__init__.py
+++ b/preprocessor/preprocessor/steps/__init__.py
@@ -0,0 +1,12 @@
+from .georeference import georeference_step
+from .output import output_step
+from .stack import stack_bands_step
+from .subdataset import extract_subdataset_step
+
+
+__all__ = [
+    'georeference_step',
+    'output_step',
+    'stack_bands_step',
+    'extract_subdataset_step',
+]
diff --git a/preprocessor/preprocessor/steps/output.py b/preprocessor/preprocessor/steps/output.py
index c6a0743a..ac1bdd86 100644
--- a/preprocessor/preprocessor/steps/output.py
+++ b/preprocessor/preprocessor/steps/output.py
@@ -5,7 +5,7 @@ from uuid import uuid4
 
 from osgeo import gdal
 
-from .util import replace_ext
+from ..util import replace_ext
 
 
 def output_step(source_dir: os.PathLike, target_dir: os.PathLike, options: dict=None):
-- 
GitLab