Newer
Older
import os
import re
import sys
import logging
import django
from django.db import transaction
from django.contrib.gis.geos import GEOSGeometry, Polygon
from .exceptions import RegistrationError
from .context import Context
from .source import Source, LocalSource, S3Source, SwiftSource
logger = logging.getLogger(__name__)
class RegistrationResult:
pass
class Backend:
def register_item(self, item: Context) -> RegistrationResult:
raise NotImplementedError
class EOxServerBackend(Backend):
def __init__(self, instance_base_path: str, instance_name: str, mapping: dict, simplify_footprint_tolerance: int=None):
self.mapping = mapping
self.simplify_footprint_tolerance = simplify_footprint_tolerance
path = os.path.join(instance_base_path, instance_name)
if path not in sys.path:
sys.path.append(path)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings") # TODO: from config
django.setup()
def exists(self, source: Source, item: Context):
from eoxserver.resources.coverages import models
return models.Product.objects.filter(identifier=item.identifier).exists()
def _get_storage_from_source(self, source: Source, path: str) -> list:
from eoxserver.backends import models as backends
created_storage_auth = False
created_storage = False
storage_name = None
if isinstance(source, LocalSource):
storage, created_storage = backends.Storage.get_or_create(
name=source.name,
url=source.root_directory,
storage_type='local',
)
storage_name = storage.name
elif isinstance(source, S3Source):
params = json.dumps({
'ACCESS_KEY_ID': source.access_key_id,
'SECRET_ACCESS_KEY': source.secret_access_key,
})
storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create(
name=source.endpoint_url,
url=source.endpoint_url,
storage_auth_type='S3',
auth_parameters=params,
)
bucket, _ = source.get_bucket_and_key(path)
storage, created_storage = backends.Storage.objects.get_or_create(
name=source.name if source.bucket_name else f'{source.name}-{bucket}',
url=bucket,
storage_type='S3',
storage_auth=storage_auth,
)
storage_name = storage.name
elif isinstance(source, SwiftSource):
params = json.dumps({
'auth-version': str(source.auth_version),
'identity-api-version': str(source.auth_version),
'username': source.username,
'password': source.password,
'tenant-name': source.tenant_name,
'tenant-id': source.tenant_id,
'region-name': source.region_name,
})
storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create(
name=source.auth_url,
url=source.auth_url_short or source.auth_url,
storage_auth_type='keystone',
auth_parameters=params,
)
container, _ = source.get_container_and_path(path)
storage, created_storage = backends.Storage.objects.get_or_create(
name=source.name if source.container else f'{source.name}-{container}',
url=container,
storage_type='swift',
storage_auth=storage_auth,
)
storage_name = storage.name
if created_storage_auth:
logger.info(f'Created storage auth for {source.name}')
if created_storage:
logger.info(f'Created storage for {source.name}')
return [storage_name] if storage_name else []
@transaction.atomic
def register(self, source: Source, item: Context, replace: bool) -> RegistrationResult:
# ugly, ugly hack
from eoxserver.resources.coverages import models
from eoxserver.resources.coverages.registration.product import ProductRegistrator
from eoxserver.resources.coverages.registration.browse import BrowseRegistrator
from eoxserver.resources.coverages.registration.mask import MaskRegistrator
from eoxserver.resources.coverages.registration.registrators.gdal import GDALRegistrator
# get the mapping for this particular item
type_mapping = self.mapping[item.product_type]
mapping = type_mapping.get(item.product_level) or type_mapping.get(None)
if not mapping:
raise RegistrationError(f'Could not get mapping for {item.product_type} {item.product_level}')
metadata_file = '/'.join(item.metadata_files[0].split('/')[1:])
storage = self._get_storage_from_source(source, item.path)
models.ProductType.objects.get(name=mapping['product_type_name'])
except models.ProductType.DoesNotExist:
pass
if 'footprint' in item.metadata:
footprint = GEOSGeometry(item.metadata.pop('footprint'))
else:
footprint = None
product, _ = ProductRegistrator().register(
metadata_locations=[storage + [metadata_file]],
type_name=mapping['product_type_name'],
replace=replace,
extended_metadata=True,
mask_locations=None,
package_path=None,
simplify_footprint_tolerance=self.simplify_footprint_tolerance,
overrides=dict(
identifier=item.identifier,
footprint=footprint,
**item.metadata
),
if not product.footprint or product.footprint.empty:
raise RegistrationError("No footprint was extracted. full product: %s" % product)
# insert the product in the to be associated collections
for collection_id in mapping.get('collections', []):
collection = models.Collection.objects.get(
identifier=collection_id,
)
models.collection_insert_eo_object(collection, product)
# register coverages and link them to the product
for raster_identifier, coverage_type_name in mapping.get('coverages', {}).items():
raster_items = item.raster_files.get(raster_identifier)
raster_items = [
storage + ['/'.join(raster_item.split('/')[1:])]
for raster_item in (raster_items if isinstance(raster_items, list) else [raster_items])
]
logger.info(f"Registering coverage{'s' if len(raster_items) > 1 else ''} {raster_items} as {coverage_type_name}")
report = GDALRegistrator().register(
metadata_locations=[storage + [metadata_file]],
coverage_type_name=coverage_type_name,
overrides={
"identifier": f'{product.identifier}__{raster_identifier}__coverage',
"footprint": None,
},
replace=replace,
)
logger.debug("Adding coverage to product")
models.product_add_coverage(product, report.coverage)
# register browses
for raster_identifier, browse_type_name in mapping.get('browses', {}).items():
raster_item = item.raster_files.get(raster_identifier)
raster_item = '/'.join(raster_item.split('/')[1:])
logger.info(f"Adding browse {browse_type_name or 'default'} {raster_item} to product")
BrowseRegistrator().register(
product.identifier,
storage + [raster_item],
browse_type_name,
)
# register masks
for mask_identifier, mask_type_name in mapping.get('masks', {}).items():
mask_item = item.mask_files.get(mask_identifier)
logger.info(f"Adding mask (file) {mask_type_name} to product")
MaskRegistrator().register(
product.identifier,
storage + [mask_item],
mask_type_name,
)
mask_item = item.masks.get(mask_identifier)
if mask_item:
logger.info(f"Adding mask (geometry) {mask_type_name} to product")
models.Mask.objects.create(
product=product,
mask_type=models.MaskType.objects.get(
product_type=product.product_type,
name=mask_type_name,
),
geometry=mask_item,
)
BACKENDS = {
'eoxserver': EOxServerBackend
}
def get_backends(config: dict, path: str) -> List[Backend]:
cfg_backends = config['backends']
backends = [
BACKENDS[cfg_backend['type']](
*cfg_backend.get('args', []),
**cfg_backend.get('kwargs', {}),
)
for cfg_backend in cfg_backends
if not cfg_backend.get('filter') or re.match(cfg_backend['filter'], path)
]
if not backends:
raise RegistrationError(f'Could not find a suitable backend for the path {path}')