EOX GitLab Instance

Skip to content
Snippets Groups Projects
backend.py 9.26 KiB
Newer Older
import os
import re
import sys
import logging
from typing import List

import django
from django.db import transaction
from django.contrib.gis.geos import GEOSGeometry, Polygon

from .exceptions import RegistrationError
from .context import Context
from .source import Source, LocalSource, S3Source, SwiftSource


logger = logging.getLogger(__name__)

class RegistrationResult:
    pass


class Backend:
    def register_item(self, item: Context) -> RegistrationResult:
        raise NotImplementedError


class EOxServerBackend(Backend):
    def __init__(self, instance_base_path: str, instance_name: str, mapping: dict, simplify_footprint_tolerance: int=None):
        self.mapping = mapping
        self.simplify_footprint_tolerance = simplify_footprint_tolerance
        path = os.path.join(instance_base_path, instance_name)
        if path not in sys.path:
            sys.path.append(path)

        os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings") # TODO: from config
        django.setup()

    def exists(self, source: Source, item: Context):
        from eoxserver.resources.coverages import models
        return models.Product.objects.filter(identifier=item.identifier).exists()
    def _get_storage_from_source(self, source: Source, path: str) -> list:
        from eoxserver.backends import models as backends

        created_storage_auth = False
        created_storage = False
        storage_name = None
        if isinstance(source, LocalSource):
            storage, created_storage = backends.Storage.get_or_create(
                name=source.name,
                url=source.root_directory,
                storage_type='local',
            )
            storage_name = storage.name
        elif isinstance(source, S3Source):
            params = json.dumps({
                'ACCESS_KEY_ID': source.access_key_id,
                'SECRET_ACCESS_KEY': source.secret_access_key,
            })

            storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create(
                name=source.endpoint_url,
                url=source.endpoint_url,
                storage_auth_type='S3',
                auth_parameters=params,
            )

            bucket, _ = source.get_bucket_and_key(path)

            storage, created_storage = backends.Storage.objects.get_or_create(
                name=source.name if source.bucket_name else f'{source.name}-{bucket}',
                url=bucket,
                storage_type='S3',
                storage_auth=storage_auth,
            )
            storage_name = storage.name

        elif isinstance(source, SwiftSource):
            params = json.dumps({
                'auth-version': str(source.auth_version),
                'identity-api-version': str(source.auth_version),
                'username': source.username,
                'password': source.password,
                'tenant-name': source.tenant_name,
                'tenant-id': source.tenant_id,
                'region-name': source.region_name,
            })

            storage_auth, created_storage_auth = backends.StorageAuth.objects.get_or_create(
                name=source.auth_url,
                url=source.auth_url_short or source.auth_url,
                storage_auth_type='keystone',
                auth_parameters=params,
            )

            container, _ = source.get_container_and_path(path)

            storage, created_storage = backends.Storage.objects.get_or_create(
                name=source.name if source.container else f'{source.name}-{container}',
                url=container,
                storage_type='swift',
                storage_auth=storage_auth,
            )
            storage_name = storage.name

        if created_storage_auth:
            logger.info(f'Created storage auth for {source.name}')
        if created_storage:
            logger.info(f'Created storage for {source.name}')

        return [storage_name] if storage_name else []

    @transaction.atomic
    def register(self, source: Source, item: Context, replace: bool) -> RegistrationResult:
        # ugly, ugly hack
        from eoxserver.resources.coverages import models
        from eoxserver.resources.coverages.registration.product import ProductRegistrator
        from eoxserver.resources.coverages.registration.browse import BrowseRegistrator
        from eoxserver.resources.coverages.registration.mask import MaskRegistrator
        from eoxserver.resources.coverages.registration.registrators.gdal import GDALRegistrator

        # get the mapping for this particular item
        type_mapping = self.mapping[item.product_type]
        mapping = type_mapping.get(item.product_level) or type_mapping.get(None)

        if not mapping:
            raise RegistrationError(f'Could not get mapping for {item.product_type} {item.product_level}')
        metadata_file = '/'.join(item.metadata_files[0].split('/')[1:])
        storage = self._get_storage_from_source(source, item.path)
            models.ProductType.objects.get(name=mapping['product_type_name'])
        except models.ProductType.DoesNotExist:
            pass

        if 'footprint' in item.metadata:
            footprint = GEOSGeometry(item.metadata.pop('footprint'))
        else:
            footprint = None
        product, _ = ProductRegistrator().register(
            metadata_locations=[storage + [metadata_file]],
            type_name=mapping['product_type_name'],
            replace=replace,
            extended_metadata=True,
            mask_locations=None,
            package_path=None,
            simplify_footprint_tolerance=self.simplify_footprint_tolerance,
            overrides=dict(
                identifier=item.identifier,
                footprint=footprint,
                **item.metadata
            ),
        if not product.footprint or product.footprint.empty:
            raise RegistrationError("No footprint was extracted. full product: %s" % product)

        # insert the product in the to be associated collections
        for collection_id in mapping.get('collections', []):
            collection = models.Collection.objects.get(
                identifier=collection_id,
            )
            models.collection_insert_eo_object(collection, product)

        # register coverages and link them to the product
        for raster_identifier, coverage_type_name in mapping.get('coverages', {}).items():
            raster_items = item.raster_files.get(raster_identifier)
            raster_items = [
                storage + ['/'.join(raster_item.split('/')[1:])]
                for raster_item in (raster_items if isinstance(raster_items, list) else [raster_items])
            ]
            logger.info(f"Registering coverage{'s' if len(raster_items) > 1 else ''} {raster_items} as {coverage_type_name}")

            report = GDALRegistrator().register(
                data_locations=raster_items,
                metadata_locations=[storage + [metadata_file]],
                coverage_type_name=coverage_type_name,
                overrides={
                    "identifier": f'{product.identifier}__{raster_identifier}__coverage',
                    "footprint": None,
                },
                replace=replace,
            )
            logger.debug("Adding coverage to product")
            models.product_add_coverage(product, report.coverage)

        # register browses
        for raster_identifier, browse_type_name in mapping.get('browses', {}).items():
            raster_item = item.raster_files.get(raster_identifier)

            raster_item = '/'.join(raster_item.split('/')[1:])
            logger.info(f"Adding browse {browse_type_name or 'default'} {raster_item} to product")

            BrowseRegistrator().register(
                product.identifier,
                storage + [raster_item],
                browse_type_name,
            )

        # register masks
        for mask_identifier, mask_type_name in mapping.get('masks', {}).items():
            mask_item = item.mask_files.get(mask_identifier)
            if mask_item:
                logger.info(f"Adding mask (file) {mask_type_name} to product")
                MaskRegistrator().register(
                    product.identifier,
                    storage + [mask_item],
                    mask_type_name,
                )
            mask_item = item.masks.get(mask_identifier)
            if mask_item:
                logger.info(f"Adding mask (geometry) {mask_type_name} to product")
                models.Mask.objects.create(
                    product=product,
                    mask_type=models.MaskType.objects.get(
                        product_type=product.product_type,
                        name=mask_type_name,
                    ),
                    geometry=mask_item,
                )


BACKENDS = {
    'eoxserver': EOxServerBackend
}

def get_backends(config: dict, path: str) -> List[Backend]:
    cfg_backends = config['backends']

    backends = [
        BACKENDS[cfg_backend['type']](
            *cfg_backend.get('args', []),
            **cfg_backend.get('kwargs', {}),
        )
        for cfg_backend in cfg_backends
        if not cfg_backend.get('filter') or re.match(cfg_backend['filter'], path)
    ]

    if not backends:
        raise RegistrationError(f'Could not find a suitable backend for the path {path}')

    return backends