EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit 20f816cc authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Improving structure of preprocessor

Fixing recursive unpacking.Fixing dockerfile.
parent 3905e162
No related branches found
No related tags found
No related merge requests found
......@@ -33,11 +33,14 @@ LABEL name="prism view server preprocessor" \
type="prism view server preprocessor" \
version="0.0.1-dev"
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
USER root
RUN apt update && \
apt install -y \
python3-redis python3-keystoneclient python3-swiftclient python3-click wait-for-it && \
python3-redis python3-keystoneclient python3-swiftclient python3-click python3-setuptools wait-for-it && \
apt autoremove -y && \
apt clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
......@@ -64,11 +67,13 @@ ENV INSTANCE_ID="prism-data-access-server_preprocessor" \
REDIS_REGISTER_QUEUE_KEY=
ADD run-preprocessor.sh \
preprocessor \
entrypoint.sh \
setup.py \
/
RUN cd preprocessor && \
COPY preprocessor /preprocessor
RUN cd / && \
python3 setup.py install
RUN chmod -v +x \
......
......@@ -6,7 +6,7 @@ import zipfile
from fnmatch import fnmatch, fnmatchcase
ARCHIVE_EXTENSIONS = ['ZIP', 'TAR', 'TAR.BZ2', 'TAR.GZ']
ARCHIVE_EXTENSIONS = ['ZIP', 'zip', 'TAR', 'tar', 'TAR.BZ2', 'tar.bz2', 'TAR.GZ', 'tar.gz']
def filter_filenames(filenames: List[PathLike], glob: str, case=False) -> List[PathLike]:
cmp = fnmatchcase if case else fnmatch
......@@ -47,15 +47,19 @@ def unpack_files(archive_path: Union[PathLike, BinaryIO] , target_dir: PathLike,
When a glob is passed, all filenames (either given or from the archive)
will be filtered and only the matching files will be extracted.
"""
iszip = False
istar = False
# open the archive and extract a list of filenames
if zipfile.is_zipfile(archive_path):
archive = zipfile.ZipFile(archive_path)
all_filenames = archive.namelist()
filenames = filenames or all_filenames
elif is_tarfile(archive_path):
if is_tarfile(archive_path):
archive = open_tarfile(archive_path)
all_filenames = archive.getnames()
filenames = filenames or all_filenames
istar = True
elif zipfile.is_zipfile(archive_path):
archive = zipfile.ZipFile(archive_path)
all_filenames = archive.namelist()
filenames = filenames or all_filenames
iszip = True
else:
raise Exception('Cannot open archive %s' % archive_path)
......@@ -63,19 +67,48 @@ def unpack_files(archive_path: Union[PathLike, BinaryIO] , target_dir: PathLike,
if glob:
filenames = filter_filenames(filenames, glob)
extracted_filenames = []
# extract the files to the target directory
archive.extractall(target_dir, filenames)
if istar:
members = [
member
for member in archive.getmembers()
if member.name in filenames
]
archive.extractall(target_dir, members)
extracted_filenames.extend([
os.path.join(target_dir, member.name)
for member in members
])
elif iszip:
archive.extractall(target_dir, filenames)
extracted_filenames.extend([
os.path.join(target_dir, filename)
for filename in filenames
])
# go into the sub-archives to extract files
if recursive:
for extension in ARCHIVE_EXTENSIONS:
sub_archives = filter_filenames(all_filenames, '*.%s' % extension)
for sub_archive in sub_archives:
sub_archive_file = archive.open(sub_archive)
if istar:
sub_archive_file = archive.extractfile(
archive.getmember(sub_archive)
)
if iszip:
sub_archive_file = archive.open(sub_archive)
sub_filenames = unpack_files(
sub_archive_file,
os.path.join(target_dir, sub_archive),
filenames, glob, recursive
glob,
filenames,
recursive
)
extracted_filenames.extend(sub_filenames)
# TODO: return a list of files extracted
return filenames
\ No newline at end of file
# return a list of files extracted
return extracted_filenames
\ No newline at end of file
......@@ -5,10 +5,9 @@ from .preprocess import preprocess_file
@click.group()
@click.argument('config_file', envvar='PREPROCESS_CONFIG', type=click.File('r'))
def cli(ctx, config_file=None):
config = yaml.load(config_file)
ctx['CONFIG'] = config
# @click.argument('--config', envvar='PREPROCESS_CONFIG', type=click.File('r'))
def cli():
pass
@cli.command(help='Run the preprocess daemon, attaching to a Redis queue')
......@@ -18,9 +17,12 @@ def daemon(ctx):
@cli.command(help='Run a single, one-off preprocessing')
@click.argument('file_path', type=str)
def preprocess(ctx, file_path):
preprocess_file(ctx['CONFIG'], file_path)
@click.option('--config-file', type=click.File('r'))
def preprocess(file_path, config_file=None):
print(config_file, file_path)
config = yaml.load(config_file)
preprocess_file(config, file_path)
if __name__ == '__main__':
cli({})
cli()
......@@ -6,6 +6,7 @@ def evaluate_xpath(root, xpath):
"""
"""
result = root.xpath(xpath, namespaces=root.nsmap)
print(xpath, result)
if result:
if isinstance(result, list):
return result[0]
......
......@@ -4,8 +4,7 @@ import itertools
import importlib
import logging
from .downloader import get_downloader
from .uploader import get_uploader
from .transfer import get_downloader, get_uploader
from .archive import unpack_files
from .metadata import extract_product_type_and_level
......@@ -17,10 +16,11 @@ logger = logging.getLogger(__name__)
def custom_preprocessor(source_dir, target_dir, path, args=None, kwargs=None):
""" Preprocessing step for a custom preprocessing.
"""
module_name, func_name = path.rpartition('.')
module_name, _, func_name = path.rpartition('.')
func = getattr(importlib.import_module(module_name), func_name)
func(source_dir, target_dir, *(args or []), **(kwargs or {}))
def georeference(source_dir: os.PathLike, target_dir: os.PathLike, type: str):
""" Apply georeferencing to the files in source_dir to create georeferenced
image files in target_dir.
......@@ -30,6 +30,7 @@ def georeference(source_dir: os.PathLike, target_dir: os.PathLike, type: str):
# TODO: implement
pass
def stack_bands(source_dir, target_dir, group_by, file_order):
""" Stack bands of the individual images
"""
......@@ -39,10 +40,11 @@ def stack_bands(source_dir, target_dir, group_by, file_order):
def output(source_dir, target_dir, crs, driver, format_options):
pass
def custom_postprocessor(source_dir, target_dir, path, args=None, kwargs=None):
""" Preprocessing step for a custom preprocessing.
"""
module_name, func_name = path.rpartition('.')
module_name, _, func_name = path.rpartition('.')
func = getattr(importlib.import_module(module_name), func_name)
func(source_dir, target_dir, *(args or []), **(kwargs or {}))
......@@ -55,6 +57,9 @@ STEP_FUNCTIONS = {
'custom_postprocessor': custom_postprocessor,
}
def flatten(l):
return [item for sublist in l for item in sublist]
# -----------------------------------------------------------------------------
......@@ -71,32 +76,39 @@ def preprocess_file(config: dict, file_path: os.PathLike):
downloader = get_downloader(
source_config['type'], source_config.get('args'), source_config.get('kwargs')
)
source_archive_path = downloader.retrieve(file_path, 'download')
source_archive_path = downloader.download(file_path, 'download')
# fetch the metadata XML file from the downloaded archive
# open the XML to retrieve the product type and level
metadata_files = unpack_files(source_archive_path, 'unpack', glob=config['metadata_glob'])
# open the XML to retrieve the product type and level
product_type, product_level = extract_product_type_and_level(metadata_files, config)
# get a concrete configuration for the type, filled with the defaults
default_config = config['preprocessor'].get('defaults', {})
typed_config = dict(config['preprocessor'].get(product_type, {}))
preprocess_config = typed_config.update(default_config)
default_config = dict(config['preprocessing'].get('defaults', {}))
preprocess_config = dict(config['preprocessing']['types'].get(product_type, {}))
default_config.update(preprocess_config)
# select and unpack files according to configuration
data_files = unpack_files(
source_archive_path,
'unpack',
glob=preprocess_config['data_file_globs'],
recursive=preprocess_config.get('nested', False),
)
additional_files = unpack_files(
source_archive_path,
'unpack',
glob=preprocess_config['additional_file_globs'],
recursive=preprocess_config.get('nested', False),
)
data_files = flatten([
unpack_files(
source_archive_path,
'unpack',
glob=glob,
recursive=preprocess_config.get('nested', False),
)
for glob in preprocess_config['data_file_globs']
])
additional_files = flatten([
unpack_files(
source_archive_path,
'unpack',
glob=glob,
recursive=preprocess_config.get('nested', False),
)
for glob in preprocess_config['additional_file_globs']
])
previous_step = 'unpack'
# make processing steps
......
from . import swift
from . import local
def get_downloader(downloader_config):
if downloader_config['type'] == 'swift':
return swift.Downloader(**downloader_config['kwargs'])
def get_downloader(type_name, args, kwargs):
if type_name == 'swift':
return swift.Downloader(*args or [], **kwargs or {})
elif type_name == 'local':
return local.Downloader(*args or [], **kwargs or {})
raise Exception('Downloader type %s is not supported' % downloader_config['type'])
raise Exception('Downloader type %s is not supported' % type_name)
def get_uploader(uploader_config):
if uploader_config['type'] == 'swift':
return swift.Uploader(**uploader_config['kwargs'])
elif uploader_config['type'] == 'local':
return local.Uploader(**uploader_config['kwargs'])
raise Exception('Uploader type %s is not supported' % uploader_config['type'])
from setuptools import setup, find_packages
with open("README.md", "r") as fh:
long_description = fh.read()
# with open("README.md", "r") as fh:
# long_description = fh.read()
long_description = ""
setup(
name="preprocessor", # Replace with your own username
......@@ -21,7 +22,7 @@ setup(
python_requires='>=3.6',
entry_points={
"console_scripts": [
"preprocessor = pvs_preprocessor.cli:cli",
"preprocessor = preprocessor.cli:cli",
],
}
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment