EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit 380c569c authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Implementing GSC registration scheme

Improving on EOxServer backend
Storages/StorageAuth are now created when necessary
parent 8d31a4a0
No related branches found
No related tags found
3 merge requests!36Staging to master to prepare 1.0.0 release,!32Registrar modularization,!27Registrar modularization
......@@ -3,6 +3,7 @@ import re
import sys
import logging
from typing import List
import json
import django
from django.db import transaction
......@@ -10,7 +11,7 @@ from django.contrib.gis.geos import GEOSGeometry, Polygon
from .exceptions import RegistrationError
from .context import Context
from .source import Source, S3Source, SwiftSource, LocalSource
from .source import Source, LocalSource, S3Source, SwiftSource
logger = logging.getLogger(__name__)
......@@ -41,7 +42,75 @@ class EOxServerBackend(Backend):
return models.Product.objects.filter(identifier=item.identifier).exists()
def _get_storage_from_source(self, source: Source, path: str) -> list:
return [source.name] if source.name else []
from eoxserver.backends import models as backends
created_storage_auth = False
created_storage = False
storage_name = None
if isinstance(source, LocalSource):
storage, created_storage = backends.Storage.get_or_create(
name=source.name,
url=source.root_directory,
storage_type='local',
)
storage_name = storage.name
elif isinstance(source, S3Source):
params = json.dumps({
'ACCESS_KEY_ID': source.access_key_id,
'SECRET_ACCESS_KEY': source.secret_access_key,
})
storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create(
name=source.endpoint_url,
url=source.endpoint_url,
storage_auth_type='S3',
auth_parameters=params,
)
bucket, _ = source.get_bucket_and_key(path)
storage, created_storage = backends.Storage.objects.get_or_create(
name=source.name,
url=bucket,
storage_type='S3',
storage_auth=storage_auth,
)
storage_name = storage.name
elif isinstance(source, SwiftSource):
params = json.dumps({
'auth-version': str(source.auth_version),
'identity-api-version': str(source.auth_version),
'username': source.username,
'password': source.password,
'tenant-name': source.tenant_name,
'tenant-id': source.tenant_id,
'region-name': source.region_name,
})
storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create(
name=source.auth_url,
url=source.auth_url_short or source.auth_url,
storage_auth_type='keystone',
auth_parameters=params,
)
container, _ = source.get_container_and_path(path)
storage, created_storage = backends.Storage.objects.get_or_create(
name=source.name,
url=container,
storage_type='swift',
storage_auth=storage_auth,
)
storage_name = storage.name
if created_storage_auth:
logger.info(f'Created storage auth for {source.name}')
if created_storage:
logger.info(f'Created storage for {source.name}')
return [storage_name] if storage_name else []
@transaction.atomic
def register(self, source: Source, item: Context, replace: bool) -> RegistrationResult:
......@@ -53,9 +122,13 @@ class EOxServerBackend(Backend):
from eoxserver.resources.coverages.registration.registrators.gdal import GDALRegistrator
# get the mapping for this particular item
mapping = self.mapping[item.product_type][item.product_level]
metadata_file = '/'.join(item.metadata_files[0].split('/')[1:])
type_mapping = self.mapping[item.product_type]
mapping = type_mapping.get(item.product_level) or type_mapping.get(None)
if not mapping:
raise RegistrationError(f'Could not get mapping for {item.product_type} {item.product_level}')
metadata_file = '/'.join(item.metadata_files[0].split('/')[1:])
storage = self._get_storage_from_source(source, item.path)
try:
......@@ -63,7 +136,10 @@ class EOxServerBackend(Backend):
except models.ProductType.DoesNotExist:
pass
footprint = GEOSGeometry(item.metadata.pop('footprint'))
if 'footprint' in item.metadata:
footprint = GEOSGeometry(item.metadata.pop('footprint'))
else:
footprint = None
product, _ = ProductRegistrator().register(
metadata_locations=[storage + [metadata_file]],
......@@ -93,11 +169,11 @@ class EOxServerBackend(Backend):
for raster_identifier, coverage_type_name in mapping.get('coverages', {}).items():
raster_items = item.raster_files.get(raster_identifier)
raster_items = [
storage + '/'.join(raster_item.split('/')[1:])
storage + ['/'.join(raster_item.split('/')[1:])]
for raster_item in (raster_items if isinstance(raster_items, list) else [raster_items])
]
logger.info(f"Registering coverage {raster_item} as {coverage_type_name}")
logger.info(f"Registering coverage{'s' if len(raster_items) > 1 else ''} {raster_items} as {coverage_type_name}")
report = GDALRegistrator().register(
data_locations=raster_items,
......
......@@ -109,17 +109,18 @@ class GSCRegistrationScheme(RegistrationScheme):
'type': Parameter('//gml:using/eop:EarthObservationEquipment/eop:platform/eop:Platform/eop:shortName/text()'),
'level': Parameter('//gml:metaDataProperty/gsc:EarthObservationMetaData/eop:parentIdentifier/text()'),
'mask': Parameter('//gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:vendorSpecific/eop:SpecificInformation[eop:localAttribute/text() = "CF_POLY"]/eop:localValue/text()'),
'footprint': Parameter('//gsc:opt_metadata/gml:metaDataProperty/gsc:EarthObservationMetaData/eop:vendorSpecific/eop:SpecificInformation[eop:localAttribute/text() = "CF_POLY"]/eop:localValue/text()'),
}
def get_context(self, source: Source, path: str) -> Context:
gcs_filenames = source.list_files(path, 'GSC*.xml')
gsc_filenames = source.list_files(path, ['GSC*.xml', 'GSC*.XML'])
metadata_file = gsc_filenames[0]
tree = read_xml(source, metadata_file)
metadata = parse_metadata_schema(tl_tree, self.GSC_SCHEMA, tree.nsmap)
metadata = parse_metadata_schema(tree, self.GSC_SCHEMA, tree.getroot().nsmap)
tiff_files = {
metadata['type']: source.list_files(path, '*.tif') + source.list_files(path, '*.TIF')
metadata['type']: source.list_files(path, ['*.tif', '*.TIF'])
}
match = re.match(r'.*(Level_[0-9]+)$', metadata['level'])
......@@ -152,7 +153,7 @@ def get_scheme(config: dict, path: str) -> RegistrationScheme:
cfg_schemes = config['schemes']
for cfg_scheme in cfg_schemes:
if cfg_scheme['filter']:
if cfg_scheme.get('filter'):
if re.match(cfg_scheme['filter'], path):
break
else:
......
......@@ -33,8 +33,8 @@ class Source:
class SwiftSource(Source):
def __init__(self, name=None, username=None, password=None, tenant_name=None,
tenant_id=None, region_name=None, user_domain_id=None,
user_domain_name=None, auth_url=None, auth_version=None,
container=None):
user_domain_name=None, auth_url=None, auth_url_short=None,
auth_version=None, container=None):
super().__init__(name)
self.username = username
......@@ -45,6 +45,7 @@ class SwiftSource(Source):
self.user_domain_id = user_domain_id
self.user_domain_name = user_domain_name
self.auth_url = auth_url
self.auth_url_short = auth_url_short
self.auth_version = auth_version # TODO: assume 3
self.container = container
......@@ -70,9 +71,12 @@ class SwiftSource(Source):
return container, path
def list_files(self, path, glob_pattern=None):
def list_files(self, path, glob_patterns=None):
container, path = self.get_container_and_path(path)
if glob_patterns and not isinstance(glob_patterns, list):
glob_patterns = [glob_patterns]
with self.get_service() as swift:
pages = swift.list(
container=container,
......@@ -84,8 +88,12 @@ class SwiftSource(Source):
if page["success"]:
# at least two files present -> pass validation
for item in page["listing"]:
if glob_pattern is None or fnmatch(item['name'], glob_pattern):
filenames.append(item['name'])
if glob_patterns is None or any(
fnmatch(item['name'], join(path, glob_pattern)) for glob_pattern in glob_patterns):
filenames.append(
item['name'] if self.container else join(container, item['name'])
)
else:
raise page['error']
......@@ -154,7 +162,10 @@ class S3Source(Source):
return bucket, path
def list_files(self, path, glob_pattern=None):
def list_files(self, path, glob_patterns=None):
if glob_patterns and not isinstance(glob_patterns, list):
glob_patterns = [glob_patterns]
bucket, key = self.get_bucket_and_key(path)
logger.info(f'Listing S3 files for bucket {bucket} and prefix {key}')
response = self.client.list_objects_v2(
......@@ -165,7 +176,9 @@ class S3Source(Source):
return [
f"{bucket}/{item['Key']}"
for item in response['Contents']
if glob_pattern is None or fnmatch(item['Key'], glob_pattern)
if glob_patterns is None or any(
fnmatch(item['name'], glob_pattern) for glob_pattern in glob_patterns
)
]
def get_file(self, path, target_path):
......@@ -195,9 +208,12 @@ class LocalSource(Source):
return join(self.root_directory, path)
def list_files(self, path, glob_pattern=None):
if glob_pattern is not None:
return glob(join(self._join_path(path), glob_pattern))
def list_files(self, path, glob_patterns=None):
if glob_patterns and not isinstance(glob_patterns, list):
glob_patterns = [glob_patterns]
if glob_patterns is not None:
return glob(join(self._join_path(path), glob_patterns[0])) # TODO
else:
return glob(join(self._join_path(path), '*'))
......@@ -219,7 +235,7 @@ def get_source(config: dict, path: str) -> Source:
cfg_sources = config['sources']
for cfg_source in cfg_sources:
if cfg_source['filter']:
if cfg_source.get('filter'):
if re.match(cfg_source['filter'], path):
break
else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment