EOX GitLab Instance

Skip to content
Snippets Groups Projects
eoxserver.py 11.49 KiB
"""
backend.py
============

Contains implementations for different backends where data may be registered

"""
import os
import sys
import logging
from typing import List, TYPE_CHECKING, Optional, TypedDict
import json
from urllib.parse import urlparse

import django
from django.db import transaction

from django.db.models import Q

if TYPE_CHECKING:
    from pystac import Item

from ..exceptions import RegistrationError
from ..source import Source, LocalSource, S3Source, SwiftSource
from .abc import Backend


logger = logging.getLogger(__name__)


class AssetsToCoverageMapping(TypedDict):
    """Mapping structure to map specific STAC Item Assets to coverages

    Attributes:
        assets: list of asset names that shall be grouped to the same
                EOxServer Coverage
    """

    assets: List[str]


class ItemToProductTypeMapping(TypedDict):
    """Mapping structure to map STAC Items to EOxServer ProductTypes

    Attributes:
        product_type: the name of the EOxServer ProductType
        filter: specific filters to limit on which STAC Items this mapping is
                applied
        coverages: a mapping of coverage type name to an
                   AssetsToCoverageMapping
        collections: a list of collection names that products of this
                     ProductType shall be inserted into
    """

    name: str
    product_type: str
    filter: dict
    coverages: dict  # TODO ?
    collections: List[str]


class EOxServerBackend(Backend):
    """
    EOxServer backend allows registration to be performed on a running
    EOxServer instance

    Args:
        instance_base_path (str):  base path of the instance
        instance_name (str): name of django instance
        product_type (str): type of product to register
        coverages (list): [description]
        masks (list, optional): [description]. Defaults to [].
        browses (list, optional): [description]. Defaults to [].
        collections (list, optional): [description]. Defaults to [].
    """

    def __init__(
        self,
        instance_base_path: str,
        instance_name: str,
        product_types: List[ItemToProductTypeMapping],
    ):
        self.product_types = product_types
        self.instance_name = instance_name
        path = os.path.join(instance_base_path, instance_name)
        if path not in sys.path:
            sys.path.append(path)

        os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings")
        django.setup()

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__} instance_name ={self.instance_name}>"

    def item_exists(self, source: Source, item: "Item") -> bool:
        """Checks whether the item exists in the given source

        Args:
            source (Source): source of the data
            item (Item): item to be checked

        Returns:
            (bool): true if exists, false otherwise
        """
        from eoxserver.resources.coverages import models

        return models.Product.objects.filter(identifier=item.id).exists()

    def _get_storage_from_source(self, source: Source, item: "Item") -> list:
        from eoxserver.backends import models as backends

        created_storage_auth = False
        created_storage = False
        storage_name = None

        if isinstance(source, LocalSource):
            storage, created_storage = backends.Storage.get_or_create(
                name=source.name,
                url=source.root_directory,
                storage_type="local",
            )
            storage_name = storage.name

        elif isinstance(source, S3Source):
            params = json.dumps(
                {
                    "ACCESS_KEY_ID": source.access_key_id,
                    "SECRET_ACCESS_KEY": source.secret_access_key,
                    "AWS_REGION": source.region_name,
                }
            )

            endpoint_url = source.endpoint_url
            if endpoint_url.startswith("https://"):
                endpoint_url = endpoint_url[len("https://") :]
            elif endpoint_url.startswith("http://"):
                endpoint_url = endpoint_url[len("http://") :]

            bucket = source.bucket_name

            # get default bucket name from "first" asset. The first path
            # component is the bucket.
            if bucket is None:
                asset = next(iter(item.get_assets().values()))
                bucket = urlparse(asset.href).path.partition("/")[0]

            (
                storage_auth,
                created_storage_auth,
            ) = backends.StorageAuth.objects.get_or_create(
                name=(source.name if source.bucket_name else f"{source.name}-{bucket}"),
                url=endpoint_url,
                storage_auth_type="S3",
            )

            storage_auth.auth_parameters = params
            storage_auth.save()

            storage, created_storage = backends.Storage.objects.get_or_create(
                name=(source.name if source.bucket_name else f"{source.name}-{bucket}"),
                url=bucket,
                storage_type="S3",
                storage_auth=storage_auth,
            )
            storage_name = storage.name

        elif isinstance(source, SwiftSource):
            params = json.dumps(
                {
                    "auth-version": str(source.auth_version),
                    "identity-api-version": str(source.auth_version),
                    "username": source.username,
                    "password": source.password,
                    "tenant-name": source.tenant_name,
                    "tenant-id": source.tenant_id,
                    "region-name": source.region_name,
                }
            )

            (
                storage_auth,
                created_storage_auth,
            ) = backends.StorageAuth.objects.get_or_create(
                name=source.auth_url,
                url=source.auth_url_short or source.auth_url,
                storage_auth_type="keystone",
                auth_parameters=params,
            )

            container = source.container

            storage, created_storage = backends.Storage.objects.get_or_create(
                name=(source.name if source.container else f"{source.name}{container}"),
                url=container,
                storage_type="swift",
                storage_auth=storage_auth,
            )
            storage_name = storage.name

        if created_storage_auth:
            logger.info(f"Created storage auth for {source.name}")
        if created_storage:
            logger.info(f"Created storage for {source.name}")

        return [storage_name] if storage_name else []

    def _register_with_stac(
        self,
        source: Source,
        item: "Item",
        replace: bool,
        storage,
        product_type: ItemToProductTypeMapping,
        create_product_type_model: bool = False,
    ):
        from eoxserver.backends import models as backends
        from eoxserver.resources.coverages import models
        from eoxserver.resources.coverages.registration.stac import (
            register_stac_product,
            create_product_type_from_stac_item,
        )

        # TODO: flag to re-use product type?
        if create_product_type_model:
            product_type_obj, created = create_product_type_from_stac_item(
                item.to_dict(),
                # TODO: use hash instead
                product_type_name=product_type["name"],
                ignore_existing=True,
                coverage_mapping=product_type.get("coverages", {}),
            )
        else:
            created = False
            product_type_obj = models.ProductType.objects.get(name=product_type["name"])
        if created:
            logger.info(f"Created Product Type {product_type_obj}")
        else:
            logger.info(f"Using existing Product Type {product_type_obj}")

        # resolve storage object
        if storage:
            storage = backends.Storage.objects.get(name=storage[0])

        product, replaced = register_stac_product(
            item.to_dict(),
            product_type_obj,
            storage,
            replace=replace,
            coverage_mapping=product_type.get("coverages", {}),
            browse_mapping=product_type.get("browses", {}),
            metadata_asset_names=product_type.get("metadata_assets"),
        )

        logger.info(
            f"Successfully {'replaced' if replaced else 'registered'} "
            f"Product {product.identifier}"
        )

        return product

    @transaction.atomic
    def register(self, source: Source, item: "Item", replace: bool):
        """Registers the item to the endpoint

        Args:
            source (Source): source of the data
            item ('Item'): item to be registered
            replace (bool): replace existing or not

        """
        # ugly, ugly hack
        from eoxserver.resources.coverages import models

        storage = self._get_storage_from_source(source, item)

        # match self.product_types with item
        for product_type in self.product_types:
            match = []
            filter_ = product_type.get("filter")
            if not filter_ and len(self.product_types) > 1:
                raise RegistrationError(
                    "Multiple product types found without filter, cannot match"
                )
            if filter_:
                for k, v in filter_.items():
                    if k in item.properties and item.properties[k] == v:
                        match.append(True)
                    else:
                        match.append(False)
                if all(match):
                    product_type = product_type
                    break
                else:
                    raise RegistrationError(f"{item} not matched to any product_type")
            else:
                break

        logger.info(f"Registering into EOxServer for type " f'"{product_type["name"]}"')

        product = self._register_with_stac(source, item, replace, storage, product_type)

        # insert the product in the to be associated collections
        for collection_id in product_type.get("collections", []):
            collection = models.Collection.objects.get(identifier=collection_id)
            models.collection_insert_eo_object(collection, product)

    @transaction.atomic
    def deregister_identifier(self, identifier: str) -> Optional[str]:
        """Attempts to deregister item

        Args:
            identifier (str): identifier to be deregistered
        """
        # ugly, ugly hack
        from eoxserver.resources.coverages import models

        try:
            logger.info(f"Deregistering product '{identifier}'")
            product = models.Product.objects.get(identifier=identifier)
            grids = list(models.Grid.objects.filter(coverage__parent_product=product))
            product.delete()

            # clean up grids
            for grid in grids:
                grid_used = models.EOObject.objects.filter(
                    Q(coverage__grid=grid) | Q(mosaic__grid=grid),
                ).exists()
                # clean up grid as well, if it is not referenced
                # anymore but saving named (user defined) grids
                if grid and not grid.name and not grid_used:
                    grid.delete()
        except models.Product.DoesNotExist:
            logger.info(f"No product with identifier '{identifier}' found")
            # no product found with that id
            # return empty list
            return None

        logger.info(f"Deregistered product with identifier '{identifier}'")
        # return the deleted identifier
        return identifier