import os import re import sys import logging from typing import List import json import django from django.db import transaction from django.contrib.gis.geos import GEOSGeometry, Polygon from .exceptions import RegistrationError from .context import Context from .source import Source, LocalSource, S3Source, SwiftSource logger = logging.getLogger(__name__) class RegistrationResult: pass class Backend: def register_item(self, item: Context) -> RegistrationResult: raise NotImplementedError class EOxServerBackend(Backend): def __init__(self, instance_base_path: str, instance_name: str, mapping: dict, simplify_footprint_tolerance: int=None): self.mapping = mapping self.simplify_footprint_tolerance = simplify_footprint_tolerance path = os.path.join(instance_base_path, instance_name) if path not in sys.path: sys.path.append(path) os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings") # TODO: from config django.setup() def exists(self, source: Source, item: Context): from eoxserver.resources.coverages import models return models.Product.objects.filter(identifier=item.identifier).exists() def _get_storage_from_source(self, source: Source, path: str) -> list: from eoxserver.backends import models as backends created_storage_auth = False created_storage = False storage_name = None if isinstance(source, LocalSource): storage, created_storage = backends.Storage.get_or_create( name=source.name, url=source.root_directory, storage_type='local', ) storage_name = storage.name elif isinstance(source, S3Source): params = json.dumps({ 'ACCESS_KEY_ID': source.access_key_id, 'SECRET_ACCESS_KEY': source.secret_access_key, }) storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create( name=source.endpoint_url, url=source.endpoint_url, storage_auth_type='S3', auth_parameters=params, ) bucket, _ = source.get_bucket_and_key(path) storage, created_storage = backends.Storage.objects.get_or_create( name=source.name if source.bucket_name else f'{source.name}-{bucket}', url=bucket, storage_type='S3', storage_auth=storage_auth, ) storage_name = storage.name elif isinstance(source, SwiftSource): params = json.dumps({ 'auth-version': str(source.auth_version), 'identity-api-version': str(source.auth_version), 'username': source.username, 'password': source.password, 'tenant-name': source.tenant_name, 'tenant-id': source.tenant_id, 'region-name': source.region_name, }) storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create( name=source.auth_url, url=source.auth_url_short or source.auth_url, storage_auth_type='keystone', auth_parameters=params, ) container, _ = source.get_container_and_path(path) storage, created_storage = backends.Storage.objects.get_or_create( name=source.name if source.container else f'{source.name}-{container}', url=container, storage_type='swift', storage_auth=storage_auth, ) storage_name = storage.name if created_storage_auth: logger.info(f'Created storage auth for {source.name}') if created_storage: logger.info(f'Created storage for {source.name}') return [storage_name] if storage_name else [] @transaction.atomic def register(self, source: Source, item: Context, replace: bool) -> RegistrationResult: # ugly, ugly hack from eoxserver.resources.coverages import models from eoxserver.resources.coverages.registration.product import ProductRegistrator from eoxserver.resources.coverages.registration.browse import BrowseRegistrator from eoxserver.resources.coverages.registration.mask import MaskRegistrator from eoxserver.resources.coverages.registration.registrators.gdal import GDALRegistrator # get the mapping for this particular item type_mapping = self.mapping[item.product_type] mapping = type_mapping.get(item.product_level) or type_mapping.get(None) if not mapping: raise RegistrationError(f'Could not get mapping for {item.product_type} {item.product_level}') metadata_file = '/'.join(item.metadata_files[0].split('/')[1:]) storage = self._get_storage_from_source(source, item.path) try: models.ProductType.objects.get(name=mapping['product_type_name']) except models.ProductType.DoesNotExist: pass if 'footprint' in item.metadata: footprint = GEOSGeometry(item.metadata.pop('footprint')) else: footprint = None product, _ = ProductRegistrator().register( metadata_locations=[storage + [metadata_file]], type_name=mapping['product_type_name'], replace=replace, extended_metadata=True, mask_locations=None, package_path=None, simplify_footprint_tolerance=self.simplify_footprint_tolerance, overrides=dict( identifier=item.identifier, footprint=footprint, **item.metadata ), ) if not product.footprint or product.footprint.empty: raise RegistrationError("No footprint was extracted. full product: %s" % product) # insert the product in the to be associated collections for collection_id in mapping.get('collections', []): collection = models.Collection.objects.get( identifier=collection_id, ) models.collection_insert_eo_object(collection, product) # register coverages and link them to the product for raster_identifier, coverage_type_name in mapping.get('coverages', {}).items(): raster_items = item.raster_files.get(raster_identifier) raster_items = [ storage + ['/'.join(raster_item.split('/')[1:])] for raster_item in (raster_items if isinstance(raster_items, list) else [raster_items]) ] logger.info(f"Registering coverage{'s' if len(raster_items) > 1 else ''} {raster_items} as {coverage_type_name}") report = GDALRegistrator().register( data_locations=raster_items, metadata_locations=[storage + [metadata_file]], coverage_type_name=coverage_type_name, overrides={ "identifier": f'{product.identifier}__{raster_identifier}__coverage', "footprint": None, }, replace=replace, ) logger.debug("Adding coverage to product") models.product_add_coverage(product, report.coverage) # register browses for raster_identifier, browse_type_name in mapping.get('browses', {}).items(): raster_item = item.raster_files.get(raster_identifier) raster_item = '/'.join(raster_item.split('/')[1:]) logger.info(f"Adding browse {browse_type_name or 'default'} {raster_item} to product") BrowseRegistrator().register( product.identifier, storage + [raster_item], browse_type_name, ) # register masks for mask_identifier, mask_type_name in mapping.get('masks', {}).items(): mask_item = item.mask_files.get(mask_identifier) if mask_item: logger.info(f"Adding mask (file) {mask_type_name} to product") MaskRegistrator().register( product.identifier, storage + [mask_item], mask_type_name, ) mask_item = item.masks.get(mask_identifier) if mask_item: logger.info(f"Adding mask (geometry) {mask_type_name} to product") models.Mask.objects.create( product=product, mask_type=models.MaskType.objects.get( product_type=product.product_type, name=mask_type_name, ), geometry=mask_item, ) BACKENDS = { 'eoxserver': EOxServerBackend } def get_backends(config: dict, path: str) -> List[Backend]: cfg_backends = config['backends'] backends = [ BACKENDS[cfg_backend['type']]( *cfg_backend.get('args', []), **cfg_backend.get('kwargs', {}), ) for cfg_backend in cfg_backends if not cfg_backend.get('filter') or re.match(cfg_backend['filter'], path) ] if not backends: raise RegistrationError(f'Could not find a suitable backend for the path {path}') return backends