EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit d803c8ea authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Initial implementation of modular registrar

parent a9984837
No related branches found
No related tags found
3 merge requests!36Staging to master to prepare 1.0.0 release,!32Registrar modularization,!27Registrar modularization
import os
import re
import sys
import logging
import django
from django.db import transaction
from django.contrib.gis.geos import GEOSGeometry
from eoxserver.resources.coverages import models
from eoxserver.resources.coverages.registration.product import ProductRegistrator
from eoxserver.resources.coverages.registration.browse import BrowseRegistrator
from eoxserver.resources.coverages.registration.mask import MaskRegistrator
from eoxserver.resources.coverages.registration.registrators.gdal import GDALRegistrator
from .exceptions import RegistrationError
from .context import Context
from .source import Source
logger = logging.getLogger(__name__)
class RegistrationResult:
pass
class Backend:
def register_item(self, item: Context) -> RegistrationResult:
raise NotImplementedError
class EOxServerBackend(Backend):
def __init__(self, instance_base_path: str, instance_name: str, mapping: dict, simplify_footprint_tolerance: int=None):
self.mapping = mapping
self.simplify_footprint_tolerance = simplify_footprint_tolerance
path = os.path.join(instance_base_path, instance_name)
if path not in sys.path:
sys.path.append(path)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings") # TODO: from config
django.setup()
def exists(self, source: Source, item: Context):
return models.Product.objects.filter(identifier=item.itentifier).exists()
def _get_storage_from_source(self, source: Source) -> list:
return []
@transaction.atomic
def register(self, source: Source, item: Context, replace: bool) -> RegistrationResult:
# get the mapping for this particular item
mapping = self.mapping.get(item.product_type, {}).get(item.level_name)
metadata_file = item.metadata_files[0]
storage = self._get_storage_from_source(source)
try:
models.ProductType.objects.get(name=item['product_type_name'])
except models.ProductType.DoesNotExist:
pass
product, _ = ProductRegistrator().register(
metadata_locations=[storage + [metadata_file]],
type_name=item['product_type_name'],
replace=replace,
extended_metadata=True,
mask_locations=None,
package_path=None,
simplify_footprint_tolerance=self.simplify_footprint_tolerance,
overrides=item.metadata,
)
if product.footprint.empty:
raise RegistrationError("No footprint was extracted. full product: %s" % product)
# insert the product in the to be associated collections
for collection_id in mapping.get('collections', []):
collection = models.Collection.objects.get(
identifier=collection_id,
)
models.collection_insert_eo_object(collection, product)
# register coverages and link them to the product
for raster_identifier, coverage_type_name in mapping.get('coverages', {}).items():
raster_item = item.raster_files.get(raster_identifier)
report = GDALRegistrator().register(
data_locations=[storage + [raster_item]],
metadata_locations=[storage + [metadata_file]],
coverage_type_name=coverage_type_name,
overrides={
"identifier": f'{product.identifier}__{raster_identifier}__coverage',
"footprint": None,
},
replace=replace,
)
logger.debug("Adding coverage to product")
models.product_add_coverage(product, report.coverage)
# register browses
for raster_identifier, browse_type_name in mapping.get('browses', {}):
raster_item = item.raster_files.get(raster_identifier)
BrowseRegistrator().register(
product.identifier,
storage + [raster_item],
browse_type_name,
)
# register masks
for mask_identifier, mask_type_name in mapping.get('masks', {}):
mask_item = item.mask_files.get(mask_identifier)
MaskRegistrator().register(
product.identifier,
storage + [mask_item],
mask_type_name,
)
BACKENDS = {
'eoxserver': EOxServerBackend
}
def get_backend(config: dict, path: str) -> Backend:
cfg_backends = config['backends']
for cfg_backend in cfg_backends:
if cfg_backend['filter']:
if re.match(cfg_backend['filter'], path):
break
else:
break
else:
# no source found
raise RegistrationError(f'Could not find a suitable backend for the path {path}')
return BACKENDS[cfg_backend['type']](
*cfg_backend.get('args', []),
**cfg_backend.get('kwargs', {}),
)
from dataclasses import dataclass, field
@dataclass
class Context:
identifier: str
product_type: str = None
product_level: str = None
metadata: dict = field(default_factory=dict)
raster_files: dict = field(default_factory=dict)
metadata_files: dict = field(default_factory=dict)
masks: dict = field(default_factory=dict)
mask_files: dict = field(default_factory=dict)
class RegistrationError(Exception):
pass
import re
from .source import get_source
from .exceptions import RegistrationError
def register(config, path):
# TODO: select registration scheme (config, path)
source = get_source(config, path)
scheme = select_registation_scheme(config, path)
context = scheme.get_context(source, path)
for pre_handler in get_pre_handlers(config):
pre_handler(config, path, context)
for backend in get_backends(config):
if backend.exists(source, context):
if config.replace:
backend.register(source, context, replace=True)
else:
raise RegistrationError(f'Object {context} is already registered')
else:
backend.register(source, context, replace=False)
for post_handler in get_post_handlers(config):
post_handler(config, path, context)
def select_registation_scheme(config, path):
cfg_schemes = config['schemes']
for cfg_scheme in cfg_schemes:
if cfg_scheme['filter']:
if re.match(cfg_scheme['filter'], path):
break
else:
break
else:
# no source found
raise RegistrationError(f'Could not find a suitable scheme for the path {path}')
def get_pre_handlers(config):
pass
def get_post_handlers(config):
pass
def get_backends(config):
pass
import re
from os.path import join
from .xml import read_xml, parse_metadata_schema, Parameter
from .context import Context
from .exceptions import RegistrationError
class RegistrationScheme:
def __init__(self, source, path):
self.source = source
self.path = path
def get_context(self):
raise NotImplementedError
class Sentinel2RegistrationScheme(RegistrationScheme):
MTD_TL_SCHEMA = {
'begin_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_START_TIME/text()', False, parse_datetime),
'end_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_STOP_TIME/text()', False, parse_datetime),
'identifier': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_URI/text()'),
'level': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PROCESSING_LEVEL/text()'),
'type': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/PRODUCT_TYPE/text()'),
'generation_time': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/GENERATION_TIME/text()', False, parse_datetime),
'cloud_cover': Parameter('/n1:Level-2A_User_Product/n1:Quality_Indicators_Info/Cloud_Coverage_Assessment'),
'image_file_paths': Parameter('/n1:Level-2A_User_Product/n1:General_Info/Product_Info/Product_Organisation/Granule_List/Granule/IMAGE_FILE/text()', True),
'mask_file_paths': Parameter('/n1:Level-2A_Tile_ID/n1:Quality_Indicators_Info/Pixel_Level_QI/MASK_FILENAME', True),
}
S2_NAMESPACES = {
'n1': "https://psd-14.sentinel2.eo.esa.int/PSD/User_Product_Level-2A.xsd"
}
def get_context(self):
metadata_file = join(self.path, 'MTD_TL.xml')
mtd_tree = read_xml(self.source, metadata_file)
# get MTD metadata
metadata = parse_metadata_schema(mtd_tree, self.MTD_TL_SCHEMA, self.S2_NAMESPACES)
band_re = re.compile(r'.*([A-Z0-9]{3})_([0-9]{2}m)$')
raster_files = {
band_re.match(image_file_path).groups()[0]: f'{join(self.path, image_file_path)}.jp2'
for image_file_path in metadata['image_file_paths']
}
mask_type_re = re.compile(r'.*/MSK_([A-Z]*)_([A-Z0-9]{3}).[a-z0-9]+$')
mask_files = {
mask_type_re.match(mask_file_path).groups[0]: mask_file_path
for mask_file_path in metadata['mask_file_paths']
}
return Context(
identifier=metadata['identifier'],
raster_files=raster_files,
mask_files=mask_files,
metadata_files=[metadata_file],
metadata={
'begin_time': metadata['begin_time'],
'end_time': metadata['end_time'],
'generation_time': metadata['generation_time'],
'cloud_cover': metadata['cloud_cover'],
}
)
class GSCRegistrationScheme(RegistrationScheme):
pass
REGISTRATION_SCHEMES = {
'gsc': GSCRegistrationScheme,
'sentinel-2': Sentinel2RegistrationScheme,
}
def get_scheme(config: dict, path: str) -> RegistrationScheme:
cfg_schemes = config['schemes']
for cfg_scheme in cfg_schemes:
if cfg_scheme['filter']:
if re.match(cfg_scheme['filter'], path):
break
else:
break
else:
# no source found
raise RegistrationError(f'Could not find a suitable scheme for the path {path}')
return REGISTRATION_SCHEMES[cfg_scheme['type']](
*cfg_scheme.get('args', []),
**cfg_scheme.get('kwargs', {}),
)
import re
from os.path import normpath, join, isabs
import shutil
from glob import glob
from fnmatch import fnmatch
import boto3
from swiftclient.multithreading import OutputManager
from swiftclient.service import SwiftError, SwiftService
class RegistrationError(Exception):
pass
class Source:
def list_files(self, path, glob_pattern=None):
raise NotImplementedError
def get_file(self, path, target_path):
raise NotImplementedError
def get_vsi_env_and_path(self, path):
raise NotImplementedError
class SwiftSource(Source):
def __init__(self, username=None, password=None, tenant_name=None,
tenant_id=None, region_name=None, user_domain_id=None,
user_domain_name=None, auth_url=None, auth_version=None,
container=None):
self.username = username
self.password = password
self.tenant_name = tenant_name
self.tenant_id = tenant_id
self.region_name = region_name
self.user_domain_id = user_domain_id
self.user_domain_name = user_domain_name
self.auth_url = auth_url
self.auth_version = auth_version # TODO: assume 3
self.container = container
def get_service(self):
return SwiftService(options={
"os_username": self.username,
"os_password": self.password,
"os_tenant_name": self.tenant_name,
"os_tenant_id": self.tenant_id,
"os_region_name": self.region_name,
"os_auth_url": self.auth_url,
"auth_version": self.auth_version,
"os_user_domain_id": self.user_domain_id,
"os_user_domain_name": self.user_domain_name,
})
def get_container_and_path(self, path: str):
container = self.container
if container is None:
parts = (path[1:] if path.startswith('/') else path).split('/')
container, path = parts[0], parts[1:].join('/')
return container, path
def list_files(self, path, glob_pattern=None):
container, path = self.get_container_and_path(path)
with self.get_service() as swift:
pages = swift.list(
container=container,
options={"prefix": path},
)
filenames = []
for page in pages:
if page["success"]:
# at least two files present -> pass validation
for item in page["listing"]:
if glob_pattern is None or fnmatch(item['name'], glob_pattern):
filenames.append(item['name'])
else:
raise page['error']
return filenames
def get_file(self, path, target_path):
container, path = self.get_container_and_path(path)
with self.get_service() as swift:
results = swift.download(
container,
[path],
options={
'out_file': target_path
}
)
for result in results:
if not result["success"]:
raise Exception('Failed to download %s' % path)
def get_vsi_env_and_path(self, path):
container, path = self.get_container_and_path(path)
return {
'OS_IDENTITY_API_VERSION': self.auth_version,
'OS_AUTH_URL': self.auth_url,
'OS_USERNAME': self.username,
'OS_PASSWORD': self.password,
'OS_USER_DOMAIN_NAME': self.user_domain_name,
# 'OS_PROJECT_NAME': self.tena,
# 'OS_PROJECT_DOMAIN_NAME': ,
'OS_REGION_NAME': self.region_name,
}, f'/vsiswift/{container}/{path}'
class S3Source(Source):
def __init__(self, bucket_name=None, secret_access_key=None, access_key_id=None, endpoint_url=None, **client_kwargs):
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client
# for client_kwargs
self.bucket_name = bucket_name
self.secret_access_key=secret_access_key
self.access_key_id=access_key_id
self.endpoint_url = endpoint_url
self.client = boto3.client(
's3',
aws_secret_access_key=secret_access_key,
aws_access_key_id=access_key_id,
endpoint_url=endpoint_url,
**client_kwargs,
)
def get_bucket_and_key(self, path: str):
container = self.bucket_name
if container is None:
parts = (path[1:] if path.startswith('/') else path).split('/')
container, path = parts[0], parts[1:].join('/')
return container, path
def list_files(self, path, glob_pattern=None):
bucket, key = self.get_bucket_and_key(path)
response = self.client.list_objects_v2(
Bucket=bucket,
Prefix=key,
)
return [
item['Key']
for item in response['Contents']
if glob_pattern is None or fnmatch(item['Key'], glob_pattern)
]
def get_file(self, path, target_path):
bucket, key = self.get_bucket_and_key(path)
self.client.download_file(bucket, key, target_path)
def get_vsi_env_and_path(self, path: str, streaming: bool=False):
bucket, key = self.get_bucket_and_key(path)
return {
'AWS_SECRET_ACCESS_KEY': self.secret_access_key,
'AWS_ACCESS_KEY_ID': self.access_key_id,
'AWS_S3_ENDPOINT': self.endpoint_url,
}, f'/{"vsis3" if not streaming else "vsis3_streaming"}/{bucket}/{key}'
class LocalSource(Source):
def __init__(self, root_directory):
self.root_directory = root_directory
def _join_path(self, path):
path = normpath(path)
if isabs(path):
path = path[1:]
return join(self.root_directory, path)
def list_files(self, path, glob_pattern=None):
if glob_pattern is not None:
return glob(join(self._join_path(path), glob_pattern))
else:
return glob(join(self._join_path(path), '*'))
def get_file(self, path, target_path):
shutil.copy(self._join_path(path), target_path)
def get_vsi_env_and_path(self, path):
return {}, self._join_path(path)
SOURCE_TYPES = {
'swift': SwiftSource,
's3': S3Source,
'local': LocalSource,
}
def get_source(config: dict, path: str) -> Source:
cfg_sources = config['sources']
for cfg_source in cfg_sources:
if cfg_source['filter']:
if re.match(cfg_source['filter'], path):
break
else:
break
else:
# no source found
raise RegistrationError(f'Could not find a suitable source for the path {path}')
return SOURCE_TYPES[cfg_source['type']](
*cfg_source.get('args', []),
**cfg_source.get('kwargs', {})
)
from tempfile import NamedTemporaryFile
from dataclasses import dataclass, field
from typing import Union, Type, Optional, List, Callable, Any
import lxml.etree
from .source import Source
def read_xml(source: Source, path: str) -> lxml.etree._ElementTree:
with NamedTemporaryFile() as f:
source.get_file(path, f.name)
return lxml.etree.parse(f)
@dataclass
class Parameter:
xpath: str
multi: bool = False
parser: Optional[Callable[[str], Any]] = None
namespaces: dict = field(default_factory=dict)
def parse_metadata_schema(tree: lxml.etree._ElementTree, schema: dict, namespaces: dict=None) -> dict:
out = {}
for key, param in schema.items():
values = tree.xpath(param.xpath, namespaces=param.namespaces or namespaces)
if param.multi:
value = [
param.parser(v) if param.parser else v
for v in values
]
else:
value = param.parser(values[0]) if param.parser else values[0]
out[key] = value
return out
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment