""" backend.py ============ Contains implementations for different backends where data may be registered """ import os import sys import logging from typing import List, TYPE_CHECKING, Optional, TypedDict import json from urllib.parse import urlparse import django from django.db import transaction from django.db.models import Q if TYPE_CHECKING: from pystac import Item from ..exceptions import RegistrationError from ..source import Source, LocalSource, S3Source, SwiftSource from .abc import Backend logger = logging.getLogger(__name__) class AssetsToCoverageMapping(TypedDict): """Mapping structure to map specific STAC Item Assets to coverages Attributes: assets: list of asset names that shall be grouped to the same EOxServer Coverage """ assets: List[str] class ItemToProductTypeMapping(TypedDict): """Mapping structure to map STAC Items to EOxServer ProductTypes Attributes: product_type: the name of the EOxServer ProductType filter: specific filters to limit on which STAC Items this mapping is applied coverages: a mapping of coverage type name to an AssetsToCoverageMapping collections: a list of collection names that products of this ProductType shall be inserted into """ name: str product_type: str filter: dict coverages: dict # TODO ? collections: List[str] class EOxServerBackend(Backend): """ EOxServer backend allows registration to be performed on a running EOxServer instance Args: instance_base_path (str): base path of the instance instance_name (str): name of django instance product_type (str): type of product to register coverages (list): [description] masks (list, optional): [description]. Defaults to []. browses (list, optional): [description]. Defaults to []. collections (list, optional): [description]. Defaults to []. """ def __init__( self, instance_base_path: str, instance_name: str, product_types: List[ItemToProductTypeMapping], ): self.product_types = product_types self.instance_name = instance_name path = os.path.join(instance_base_path, instance_name) if path not in sys.path: sys.path.append(path) os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings") django.setup() def __repr__(self) -> str: return f"<{self.__class__.__name__} instance_name ={self.instance_name}>" def item_exists(self, source: Source, item: "Item") -> bool: """Checks whether the item exists in the given source Args: source (Source): source of the data item (Item): item to be checked Returns: (bool): true if exists, false otherwise """ from eoxserver.resources.coverages import models return models.Product.objects.filter(identifier=item.id).exists() def _get_storage_from_source(self, source: Source, item: "Item") -> list: from eoxserver.backends import models as backends created_storage_auth = False created_storage = False storage_name = None if isinstance(source, LocalSource): storage, created_storage = backends.Storage.get_or_create( name=source.name, url=source.root_directory, storage_type="local", ) storage_name = storage.name elif isinstance(source, S3Source): params = json.dumps( { "ACCESS_KEY_ID": source.access_key_id, "SECRET_ACCESS_KEY": source.secret_access_key, "AWS_REGION": source.region_name, } ) endpoint_url = source.endpoint_url if endpoint_url.startswith("https://"): endpoint_url = endpoint_url[len("https://") :] elif endpoint_url.startswith("http://"): endpoint_url = endpoint_url[len("http://") :] bucket = source.bucket_name # get default bucket name from "first" asset. The first path # component is the bucket. if bucket is None: asset = next(iter(item.get_assets().values())) bucket = urlparse(asset.href).path.partition("/")[0] ( storage_auth, created_storage_auth, ) = backends.StorageAuth.objects.get_or_create( name=(source.name if source.bucket_name else f"{source.name}-{bucket}"), url=endpoint_url, storage_auth_type="S3", ) storage_auth.auth_parameters = params storage_auth.save() storage, created_storage = backends.Storage.objects.get_or_create( name=(source.name if source.bucket_name else f"{source.name}-{bucket}"), url=bucket, storage_type="S3", storage_auth=storage_auth, ) storage_name = storage.name elif isinstance(source, SwiftSource): params = json.dumps( { "auth-version": str(source.auth_version), "identity-api-version": str(source.auth_version), "username": source.username, "password": source.password, "tenant-name": source.tenant_name, "tenant-id": source.tenant_id, "region-name": source.region_name, } ) ( storage_auth, created_storage_auth, ) = backends.StorageAuth.objects.get_or_create( name=source.auth_url, url=source.auth_url_short or source.auth_url, storage_auth_type="keystone", auth_parameters=params, ) container = source.container storage, created_storage = backends.Storage.objects.get_or_create( name=(source.name if source.container else f"{source.name}{container}"), url=container, storage_type="swift", storage_auth=storage_auth, ) storage_name = storage.name if created_storage_auth: logger.info(f"Created storage auth for {source.name}") if created_storage: logger.info(f"Created storage for {source.name}") return [storage_name] if storage_name else [] def _register_with_stac( self, source: Source, item: "Item", replace: bool, storage, product_type: ItemToProductTypeMapping, create_product_type_model: bool = False, ): from eoxserver.backends import models as backends from eoxserver.resources.coverages import models from eoxserver.resources.coverages.registration.stac import ( register_stac_product, create_product_type_from_stac_item, ) # TODO: flag to re-use product type? if create_product_type_model: product_type_obj, created = create_product_type_from_stac_item( item.to_dict(), # TODO: use hash instead product_type_name=product_type["name"], ignore_existing=True, coverage_mapping=product_type.get("coverages", {}), ) else: created = False product_type_obj = models.ProductType.objects.get(name=product_type["name"]) if created: logger.info(f"Created Product Type {product_type_obj}") else: logger.info(f"Using existing Product Type {product_type_obj}") # resolve storage object if storage: storage = backends.Storage.objects.get(name=storage[0]) product, replaced = register_stac_product( item.to_dict(), product_type_obj, storage, replace=replace, coverage_mapping=product_type.get("coverages", {}), browse_mapping=product_type.get("browses", {}), metadata_asset_names=product_type.get("metadata_assets"), ) logger.info( f"Successfully {'replaced' if replaced else 'registered'} " f"Product {product.identifier}" ) return product @transaction.atomic def register(self, source: Source, item: "Item", replace: bool): """Registers the item to the endpoint Args: source (Source): source of the data item ('Item'): item to be registered replace (bool): replace existing or not """ # ugly, ugly hack from eoxserver.resources.coverages import models storage = self._get_storage_from_source(source, item) # match self.product_types with item for product_type in self.product_types: match = [] filter_ = product_type.get("filter") if not filter_ and len(self.product_types) > 1: raise RegistrationError( "Multiple product types found without filter, cannot match" ) if filter_: for k, v in filter_.items(): if k in item.properties and item.properties[k] == v: match.append(True) else: match.append(False) if all(match): product_type = product_type break else: raise RegistrationError(f"{item} not matched to any product_type") else: break logger.info(f"Registering into EOxServer for type " f'"{product_type["name"]}"') product = self._register_with_stac(source, item, replace, storage, product_type) # insert the product in the to be associated collections for collection_id in product_type.get("collections", []): collection = models.Collection.objects.get(identifier=collection_id) models.collection_insert_eo_object(collection, product) @transaction.atomic def deregister_identifier(self, identifier: str) -> Optional[str]: """Attempts to deregister item Args: identifier (str): identifier to be deregistered """ # ugly, ugly hack from eoxserver.resources.coverages import models try: logger.info(f"Deregistering product '{identifier}'") product = models.Product.objects.get(identifier=identifier) grids = list(models.Grid.objects.filter(coverage__parent_product=product)) product.delete() # clean up grids for grid in grids: grid_used = models.EOObject.objects.filter( Q(coverage__grid=grid) | Q(mosaic__grid=grid), ).exists() # clean up grid as well, if it is not referenced # anymore but saving named (user defined) grids if grid and not grid.name and not grid_used: grid.delete() except models.Product.DoesNotExist: logger.info(f"No product with identifier '{identifier}' found") # no product found with that id # return empty list return None logger.info(f"Deregistered product with identifier '{identifier}'") # return the deleted identifier return identifier