EOX GitLab Instance

Commit 7ce19954 authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

Merge branch 'processing-updates' into 'main'

processing updates

See merge request !3
parents e70b4c6d e34bf802
Pipeline #25162 passed with stages
in 13 minutes and 44 seconds
......@@ -3,3 +3,5 @@ tests/
.git/
.gitlab-ci.yml
chart/
.venv/
.devcontainer/
......@@ -19,7 +19,7 @@ test:
- pip3 install -r requirements-dev.txt
- flake8
- mypy .
- pytest --cov preprocessor --cov-report term-missing
- pytest
coverage: "/TOTAL.+ ([0-9]{1,3}%)/"
publish_latest:
......
......@@ -42,7 +42,8 @@ COPY requirements.txt .
RUN apt-get update && \
apt-get install --no-install-recommends -y \
python3-pip python3-setuptools wait-for-it && \
python3 python3-pip python3-setuptools build-essential wait-for-it python3-dev && \
pip install --no-binary :all: rasterio==1.3a4 fiona shapely && \
pip install --no-cache-dir -r requirements.txt && \
apt-get autoremove -y && \
apt-get clean && \
......@@ -57,9 +58,12 @@ ENV INSTANCE_ID="prism-data-access-server_preprocessor" \
REDIS_PREPROCESS_QUEUE_KEY="preprocess_queue" \
GDAL_DISABLE_READDIR_ON_OPEN="TRUE" \
GDAL_HTTP_TIMEOUT="30" \
GDAL_HTTP_MAX_RETRY="3" \
GDAL_HTTP_MAX_RETRY="16" \
GDAL_HTTP_MERGE_CONSECUTIVE_RANGES="TRUE" \
GDAL_HTTP_RETRY_DELAY="5" \
MAPCHETE_IO_RETRY_BACKOFF="2" \
MAPCHETE_IO_RETRY_DELAY="2" \
MAPCHETE_IO_RETRY_TRIES="5" \
GDAL_PAM_ENABLED="NO" \
CPL_VSIL_GZIP_WRITE_PROPERTIES="NO" \
DEVELOPMENT="false" \
......
......@@ -10,57 +10,52 @@ from .globals import StorageType
def get_swift_auth_env(config: StorageAuthConfig) -> Dict[str, str]:
swift_config = config.swift
if swift_config:
conn = Connection(
authurl=swift_config.authurl,
user=swift_config.user,
key=swift_config.key,
tenant_name=swift_config.tenant_name,
auth_version=swift_config.auth_version,
os_options={"region_name": swift_config.region_name},
)
url, token = conn.get_auth()
return {
"OS_STORAGE_URL": url,
"OS_AUTH_TOKEN": token,
"SWIFT_STORAGE_URL": url,
"SWIFT_AUTH_TOKEN": token,
}
raise TypeError("StorageAuthConfig.swift cannot be None")
if not swift_config:
raise TypeError("StorageAuthConfig.swift cannot be None")
conn = Connection(
authurl=swift_config.authurl,
user=swift_config.user,
key=swift_config.key,
tenant_name=swift_config.tenant_name,
auth_version=swift_config.auth_version,
os_options={"region_name": swift_config.region_name},
)
url, token = conn.get_auth()
return {
"OS_STORAGE_URL": url,
"OS_AUTH_TOKEN": token,
"SWIFT_STORAGE_URL": url,
"SWIFT_AUTH_TOKEN": token,
}
def get_s3_auth_env(config: StorageAuthConfig) -> Dict[str, str]:
s3_config = config.s3
if not s3_config:
raise TypeError("StorageAuthConfig.s3 cannot be None")
if s3_config:
return {
"AWS_ACCESS_KEY_ID": s3_config.access_key_id,
"AWS_SECRET_ACCESS_KEY": s3_config.secret_access_key,
"AWS_NO_SIGN_REQUEST": "YES" if s3_config.public else "NO",
}
raise TypeError("StorageAuthConfig.s3 cannot be None")
return {
"AWS_ACCESS_KEY_ID": s3_config.access_key_id,
"AWS_SECRET_ACCESS_KEY": s3_config.secret_access_key,
"AWS_NO_SIGN_REQUEST": "YES" if s3_config.public else "NO",
}
def get_null_auth_env(config: StorageAuthConfig) -> Dict[str, str]:
return {}
GetAuthenticationVarsFunction = Callable[[StorageAuthConfig], Dict[str, str]]
GetAuthVarFunction = Callable[[StorageAuthConfig], Dict[str, str]]
AUTH_MAP: Dict[StorageType, GetAuthenticationVarsFunction] = {
AUTH_MAP: Dict[StorageType, GetAuthVarFunction] = {
StorageType.swift: get_swift_auth_env,
StorageType.s3: get_s3_auth_env,
StorageType.file: get_null_auth_env,
}
def get_auth_env(auth_config: StorageAuthConfig) -> Dict[str, str]:
return AUTH_MAP[auth_config.type](auth_config)
def set_environment_variables(env) -> Dict[str, str]:
old_values: Dict[str, str] = {}
for key, value in env.items():
......@@ -83,7 +78,16 @@ def clean_environment_variables(env: Dict[str, str]) -> None:
def set_auth(
auth_config: StorageAuthConfig,
) -> Iterator:
env = get_auth_env(auth_config)
"""Set appropriate authentication environment variables from the given config
Args:
auth_config (StorageAuthConfig): Storage authentication configuration
Yields:
Iterator
"""
env = AUTH_MAP[auth_config.type](auth_config)
old_env = set_environment_variables(env)
yield
......
......@@ -350,6 +350,43 @@
"enum": ["xml", "json"],
"type": "string"
},
"PropertyConfig": {
"title": "PropertyConfig",
"description": "Mapping of properties",
"properties": {
"from_property": {
"title": "From Property",
"type": "string"
},
"to_property": {
"title": "To Property",
"type": "string"
},
"format_function": {
"title": "Format Function",
"type": ["string", "null"],
"default": null
}
},
"required": ["from_property", "to_property"],
"additionalProperties": false
},
"StaticConfig": {
"title": "StaticConfig",
"description": "Mapping of additional metadata",
"properties": {
"name": {
"title": "Name",
"type": "string"
},
"value": {
"title": "Value",
"type": "string"
}
},
"required": ["name", "value"],
"additionalProperties": false
},
"ReaderConfig": {
"title": "ReaderConfig",
"type": "object",
......@@ -366,16 +403,15 @@
"title": "File Regex",
"type": "string"
},
"field_from": {
"title": "Field From",
"type": "string"
},
"field_to": {
"title": "Field To",
"type": "string"
"property_mapping": {
"title": "Property Mapping",
"type": "array",
"items": {
"$ref": "#/definitions/PropertyConfig"
}
}
},
"required": ["type", "asset", "file_regex", "field_from", "field_to"],
"required": ["type", "asset", "file_regex", "property_mapping"],
"additionalProperties": false
},
"MetadataConfig": {
......@@ -398,6 +434,14 @@
"$ref": "#/definitions/ReaderConfig"
},
"additionalItems": false
},
"static": {
"title": "Static",
"type": "array",
"items": {
"$ref": "#/definitions/StaticConfig"
},
"additionalItems": false
}
},
"additionalProperties": false
......
......@@ -17,6 +17,7 @@ from .globals import (
StorageType,
ReaderType,
RasterFormat,
SCHEMA_FILE_NAME,
)
......@@ -143,13 +144,19 @@ class AuthConfig:
source: StorageAuthConfig = field(default_factory=default_storage_auth_config)
@dataclass
class PropertyConfig:
from_property: str
to_property: str
format_function: Optional[str] = None
@dataclass
class ReaderConfig:
type: ReaderType
asset: str
file_regex: str
field_from: str
field_to: str
property_mapping: List[PropertyConfig]
@dataclass
......@@ -159,10 +166,17 @@ class MetadataDataAssetConfig:
output_asset: str
@dataclass
class StaticConfig:
name: str
value: str
@dataclass
class MetadataConfig:
assets: List[MetadataDataAssetConfig] = field(default_factory=list)
readers: List[ReaderConfig] = field(default_factory=list)
static: List[StaticConfig] = field(default_factory=list)
@dataclass
......@@ -182,6 +196,15 @@ class PreprocessorConfig:
def load_config(cfg_path: str) -> PreprocessorConfig:
"""Load the configuration as structured config
Args:
cfg_path (str): File path to configuration
Returns:
PreprocessorConfig: Structured configuration
"""
return cast(
PreprocessorConfig,
OmegaConf.merge(
......@@ -191,7 +214,13 @@ def load_config(cfg_path: str) -> PreprocessorConfig:
def validate_config(config_file: str) -> None:
schema_path = join(dirname(__file__), "config-schema.json")
"""Validate config file against schema
Args:
config_file (str): path to the yaml configuration file
"""
schema_path = join(dirname(__file__), SCHEMA_FILE_NAME)
with open(schema_path) as f1, open(config_file) as f2:
schema = json.load(f1)
......@@ -201,4 +230,13 @@ def validate_config(config_file: str) -> None:
def to_dict(config):
"""Returns a dictionary from an OmegaConf structured config
Args:
config: config to convert
Returns:
dict: Dictionary of config
"""
return OmegaConf.to_container(OmegaConf.structured(config), enum_to_str=True)
......@@ -11,6 +11,7 @@ LOGGER = structlog.get_logger(__name__)
TEMP_PREFIX = "preprocessor_"
SCHEMA_FILE_NAME = "config-schema.json"
class PreprocessorException(Exception):
......@@ -87,6 +88,15 @@ FILE_SCHEME = "file"
def timing(f: Callable) -> Callable:
"""Function wrapper to measure runtime of functions
Args:
f (Callable): function to measure
Returns:
Callable: wrapped function
"""
@wraps(f)
def wrap(*args, **kwargs):
start = perf_counter()
......
from .archive import archive_extract
from .handler import get_input_handler, InputData
__all__ = ["archive_extract", "get_input_handler", "InputData"]
from dataclasses import dataclass
import pathlib
import re
import tarfile
import zipfile
from os.path import join
from urllib.parse import urlparse
from abc import ABC, abstractmethod
from os.path import join
from typing import Callable, Dict, Optional, List, Tuple, Type, Union
from typing import Optional, List, Type, Union
import fsspec
from pystac import Item
from .globals import (
FILE_SCHEME,
FORMAT_MAP,
VSI_NET_MAPPING,
InputType,
StorageType,
timing,
)
from .config import (
ArchiveDataAssetConfig,
DataInputConfig,
FileDataAssetConfig,
)
from .utils import get_vsi_network_prefix
from ..globals import StorageType
class ArchiveError(Exception):
......@@ -42,19 +29,30 @@ class Archive(ABC):
@abstractmethod
def __enter__(self):
...
"""Context manager for archives"""
@abstractmethod
def __exit__(self, exc_type, exc_value, traceback):
...
"""Exits the context"""
@abstractmethod
def get_members(self) -> List[InfoFile]:
...
"""Return a list of archive info files
Returns:
List[InfoFile]: List of InfoFiles
"""
@abstractmethod
def extract_file(self, f: InfoFile) -> ExFile:
...
"""Extract a file in memory
Args:
f (InfoFile): Valid info file
Returns:
ExFile: Extracted file
"""
class TarArchive(Archive):
......@@ -81,10 +79,8 @@ class ZipArchive(Archive):
vsi_prefix = "/vsizip/"
def __enter__(self):
try:
self.archive = zipfile.ZipFile(self.path)
except AttributeError:
self.archive = zipfile.ZipFile(self.path.open())
self.archive = zipfile.ZipFile(self.path)
return self
def __exit__(self, exc_type, exc_value, traceback):
......@@ -118,7 +114,7 @@ def recursive_archive_search(
if re.match(regex, file.name):
matches.append(join(path, str(file)))
elif file.suffix.lower() in ARCHIVE_MAPPING:
suffix = get_suffix(file)
suffix = get_archive_suffix(file)
ArchiveClass = get_archive(suffix)
new_archive = archive.extract_file(member)
new_path = join(path, f_name)
......@@ -133,6 +129,7 @@ def recursive_archive_extract(
regex: str,
to_path: str,
) -> str:
archives = []
for member in archive.get_members():
f_name = get_member_name(member)
......@@ -144,11 +141,14 @@ def recursive_archive_extract(
f.write(data.read().decode("utf-8"))
return extract_path
elif file.suffix.lower() in ARCHIVE_MAPPING:
suffix = get_suffix(file)
ArchiveClass = get_archive(suffix)
new_archive = archive.extract_file(member)
with ArchiveClass(new_archive) as t:
return recursive_archive_extract(t, regex, to_path)
archives.append((file, member))
for file, member in archives:
suffix = get_archive_suffix(file)
ArchiveClass = get_archive(suffix)
new_archive = archive.extract_file(member)
with ArchiveClass(new_archive) as t:
return recursive_archive_extract(t, regex, to_path)
raise ArchiveError(f"Cannot find {regex} in {archive}")
......@@ -156,7 +156,7 @@ def recursive_archive_extract(
def archive_search(
path: str, regex: str, matches: List[str], prefixes: Optional[List[str]] = None
) -> None:
suffix = get_suffix(path)
suffix = get_archive_suffix(path)
ArchiveClass = get_archive(suffix)
with fsspec.open(path) as p, ArchiveClass(p) as archive:
......@@ -168,7 +168,7 @@ def archive_search(
def archive_extract(path: str, regex: str, to_path: str) -> str:
suffix = get_suffix(path)
suffix = get_archive_suffix(path)
ArchiveClass = get_archive(suffix)
with fsspec.open(path) as p, ArchiveClass(p) as archive:
......@@ -189,11 +189,6 @@ def prepare_path(archive: Archive) -> str:
return join(str(_path))
def create_vsi_path(path: str, prefixes: List[str]) -> str:
prepared_prefixes = "".join(reversed(prefixes)).replace("//", "/")
return f"{prepared_prefixes}{path}"
ARCHIVE_MAPPING = {
".tar": TarArchive,
".tgz": TarArchive,
......@@ -203,23 +198,17 @@ ARCHIVE_MAPPING = {
}
def get_suffix(path: Union[str, pathlib.Path]) -> str:
def get_archive_suffix(path: Union[str, pathlib.Path]) -> str:
if isinstance(path, str):
suffixes = pathlib.Path(path).suffixes
else:
suffixes = path.suffixes
path = pathlib.Path(path)
suffixes = path.suffixes
return "".join(
suffix.lower() for suffix in suffixes if suffix.lower() in ARCHIVE_MAPPING
)
def get_vsi_network_prefix(path: str) -> str:
scheme = urlparse(path).scheme or FILE_SCHEME
storage_type = StorageType(scheme)
return VSI_NET_MAPPING[storage_type]
def get_archive(suffix: str) -> Type[Archive]:
return ARCHIVE_MAPPING[suffix]
......@@ -233,121 +222,3 @@ def get_member_name(member: InfoFile) -> str:
raise ArchiveError(f"Member {member} not recognized as valid InfoFile")
return f_name
@dataclass
class InputData:
key: str
paths: List[str]
def get_key_path_pairs(self) -> List[Tuple[str, str]]:
pairs = []
if len(self.paths) > 1:
for i, path in enumerate(self.paths, start=1):
new_key = f"{self.key}_{i}"
pairs.append((new_key, path))
else:
pairs.append((self.key, self.paths[0]))
return pairs
class IOHandler(ABC):
@abstractmethod
def get_input(self, item: Item) -> List[InputData]:
...
@abstractmethod
def get_temp_file_path(
self, input: List[InputData], temp_dir: str, format: str, item: Item
) -> str:
...
class ArchiveIOHandler(IOHandler):
def __init__(
self, asset_map: List[ArchiveDataAssetConfig], file_name_regex: str
) -> None:
self.asset_map = asset_map
self.file_name_regex = file_name_regex
@timing
def get_input(self, item: Item) -> List[InputData]:
input: List[InputData] = []
for asset_cfg in self.asset_map:
asset = item.assets[asset_cfg.asset]
data_matches: List[str] = []
prefixes: List[str] = []
archive_search(asset.href, asset_cfg.regex, data_matches, prefixes)
input.append(
InputData(
key=asset_cfg.key,
paths=[create_vsi_path(p, prefixes) for p in data_matches],
)
)
return input
def get_temp_file_path(
self, input: List[InputData], temp_dir: str, format: str, item: Item
) -> str:
extension = FORMAT_MAP[format]
for i in input:
if match := re.search(self.file_name_regex, i.paths[0]):
return join(temp_dir, f"{match.group()}{extension}")
raise ArchiveError(f"{self.file_name_regex} not found in {input}")
def create_archive_handler(config: DataInputConfig) -> IOHandler:
archive_config = config.archive
if archive_config:
return ArchiveIOHandler(
asset_map=archive_config.asset_map,
file_name_regex=archive_config.file_name_regex,
)
raise TypeError("DataInputConfig.archive cannot be None")
class FileIOHandler(IOHandler):
def __init__(self, asset_map: List[FileDataAssetConfig]) -> None:
self.asset_map = asset_map
def get_input(self, item: Item) -> List[InputData]:
input: List[InputData] = []
for asset_cfg in self.asset_map:
asset = item.assets[asset_cfg.asset]
input.append(InputData(asset_cfg.key, [asset.href]))
return input
def get_temp_file_path(
self, input: List[InputData], temp_di