-
Fabian Schindler authoredFabian Schindler authored
eoxserver.py 11.49 KiB
"""
backend.py
============
Contains implementations for different backends where data may be registered
"""
import os
import sys
import logging
from typing import List, TYPE_CHECKING, Optional, TypedDict
import json
from urllib.parse import urlparse
import django
from django.db import transaction
from django.db.models import Q
if TYPE_CHECKING:
from pystac import Item
from ..exceptions import RegistrationError
from ..source import Source, LocalSource, S3Source, SwiftSource
from .abc import Backend
logger = logging.getLogger(__name__)
class AssetsToCoverageMapping(TypedDict):
"""Mapping structure to map specific STAC Item Assets to coverages
Attributes:
assets: list of asset names that shall be grouped to the same
EOxServer Coverage
"""
assets: List[str]
class ItemToProductTypeMapping(TypedDict):
"""Mapping structure to map STAC Items to EOxServer ProductTypes
Attributes:
product_type: the name of the EOxServer ProductType
filter: specific filters to limit on which STAC Items this mapping is
applied
coverages: a mapping of coverage type name to an
AssetsToCoverageMapping
collections: a list of collection names that products of this
ProductType shall be inserted into
"""
name: str
product_type: str
filter: dict
coverages: dict # TODO ?
collections: List[str]
class EOxServerBackend(Backend):
"""
EOxServer backend allows registration to be performed on a running
EOxServer instance
Args:
instance_base_path (str): base path of the instance
instance_name (str): name of django instance
product_type (str): type of product to register
coverages (list): [description]
masks (list, optional): [description]. Defaults to [].
browses (list, optional): [description]. Defaults to [].
collections (list, optional): [description]. Defaults to [].
"""
def __init__(
self,
instance_base_path: str,
instance_name: str,
product_types: List[ItemToProductTypeMapping],
):
self.product_types = product_types
self.instance_name = instance_name
path = os.path.join(instance_base_path, instance_name)
if path not in sys.path:
sys.path.append(path)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"{instance_name}.settings")
django.setup()
def __repr__(self) -> str:
return f"<{self.__class__.__name__} instance_name ={self.instance_name}>"
def item_exists(self, source: Source, item: "Item") -> bool:
"""Checks whether the item exists in the given source
Args:
source (Source): source of the data
item (Item): item to be checked
Returns:
(bool): true if exists, false otherwise
"""
from eoxserver.resources.coverages import models
return models.Product.objects.filter(identifier=item.id).exists()
def _get_storage_from_source(self, source: Source, item: "Item") -> list:
from eoxserver.backends import models as backends
created_storage_auth = False
created_storage = False
storage_name = None
if isinstance(source, LocalSource):
storage, created_storage = backends.Storage.get_or_create(
name=source.name,
url=source.root_directory,
storage_type="local",
)
storage_name = storage.name
elif isinstance(source, S3Source):
params = json.dumps(
{
"ACCESS_KEY_ID": source.access_key_id,
"SECRET_ACCESS_KEY": source.secret_access_key,
"AWS_REGION": source.region_name,
}
)
endpoint_url = source.endpoint_url
if endpoint_url.startswith("https://"):
endpoint_url = endpoint_url[len("https://") :]
elif endpoint_url.startswith("http://"):
endpoint_url = endpoint_url[len("http://") :]
bucket = source.bucket_name
# get default bucket name from "first" asset. The first path
# component is the bucket.
if bucket is None:
asset = next(iter(item.get_assets().values()))
bucket = urlparse(asset.href).path.partition("/")[0]
(
storage_auth,
created_storage_auth,
) = backends.StorageAuth.objects.get_or_create(
name=(source.name if source.bucket_name else f"{source.name}-{bucket}"),
url=endpoint_url,
storage_auth_type="S3",
)
storage_auth.auth_parameters = params
storage_auth.save()
storage, created_storage = backends.Storage.objects.get_or_create(
name=(source.name if source.bucket_name else f"{source.name}-{bucket}"),
url=bucket,
storage_type="S3",
storage_auth=storage_auth,
)
storage_name = storage.name
elif isinstance(source, SwiftSource):
params = json.dumps(
{
"auth-version": str(source.auth_version),
"identity-api-version": str(source.auth_version),
"username": source.username,
"password": source.password,
"tenant-name": source.tenant_name,
"tenant-id": source.tenant_id,
"region-name": source.region_name,
}
)
(
storage_auth,
created_storage_auth,
) = backends.StorageAuth.objects.get_or_create(
name=source.auth_url,
url=source.auth_url_short or source.auth_url,
storage_auth_type="keystone",
auth_parameters=params,
)
container = source.container
storage, created_storage = backends.Storage.objects.get_or_create(
name=(source.name if source.container else f"{source.name}{container}"),
url=container,
storage_type="swift",
storage_auth=storage_auth,
)
storage_name = storage.name
if created_storage_auth:
logger.info(f"Created storage auth for {source.name}")
if created_storage:
logger.info(f"Created storage for {source.name}")
return [storage_name] if storage_name else []
def _register_with_stac(
self,
source: Source,
item: "Item",
replace: bool,
storage,
product_type: ItemToProductTypeMapping,
create_product_type_model: bool = False,
):
from eoxserver.backends import models as backends
from eoxserver.resources.coverages import models
from eoxserver.resources.coverages.registration.stac import (
register_stac_product,
create_product_type_from_stac_item,
)
# TODO: flag to re-use product type?
if create_product_type_model:
product_type_obj, created = create_product_type_from_stac_item(
item.to_dict(),
# TODO: use hash instead
product_type_name=product_type["name"],
ignore_existing=True,
coverage_mapping=product_type.get("coverages", {}),
)
else:
created = False
product_type_obj = models.ProductType.objects.get(name=product_type["name"])
if created:
logger.info(f"Created Product Type {product_type_obj}")
else:
logger.info(f"Using existing Product Type {product_type_obj}")
# resolve storage object
if storage:
storage = backends.Storage.objects.get(name=storage[0])
product, replaced = register_stac_product(
item.to_dict(),
product_type_obj,
storage,
replace=replace,
coverage_mapping=product_type.get("coverages", {}),
browse_mapping=product_type.get("browses", {}),
metadata_asset_names=product_type.get("metadata_assets"),
)
logger.info(
f"Successfully {'replaced' if replaced else 'registered'} "
f"Product {product.identifier}"
)
return product
@transaction.atomic
def register(self, source: Source, item: "Item", replace: bool):
"""Registers the item to the endpoint
Args:
source (Source): source of the data
item ('Item'): item to be registered
replace (bool): replace existing or not
"""
# ugly, ugly hack
from eoxserver.resources.coverages import models
storage = self._get_storage_from_source(source, item)
# match self.product_types with item
for product_type in self.product_types:
match = []
filter_ = product_type.get("filter")
if not filter_ and len(self.product_types) > 1:
raise RegistrationError(
"Multiple product types found without filter, cannot match"
)
if filter_:
for k, v in filter_.items():
if k in item.properties and item.properties[k] == v:
match.append(True)
else:
match.append(False)
if all(match):
product_type = product_type
break
else:
raise RegistrationError(f"{item} not matched to any product_type")
else:
break
logger.info(f"Registering into EOxServer for type " f'"{product_type["name"]}"')
product = self._register_with_stac(source, item, replace, storage, product_type)
# insert the product in the to be associated collections
for collection_id in product_type.get("collections", []):
collection = models.Collection.objects.get(identifier=collection_id)
models.collection_insert_eo_object(collection, product)
@transaction.atomic
def deregister_identifier(self, identifier: str) -> Optional[str]:
"""Attempts to deregister item
Args:
identifier (str): identifier to be deregistered
"""
# ugly, ugly hack
from eoxserver.resources.coverages import models
try:
logger.info(f"Deregistering product '{identifier}'")
product = models.Product.objects.get(identifier=identifier)
grids = list(models.Grid.objects.filter(coverage__parent_product=product))
product.delete()
# clean up grids
for grid in grids:
grid_used = models.EOObject.objects.filter(
Q(coverage__grid=grid) | Q(mosaic__grid=grid),
).exists()
# clean up grid as well, if it is not referenced
# anymore but saving named (user defined) grids
if grid and not grid.name and not grid_used:
grid.delete()
except models.Product.DoesNotExist:
logger.info(f"No product with identifier '{identifier}' found")
# no product found with that id
# return empty list
return None
logger.info(f"Deregistered product with identifier '{identifier}'")
# return the deleted identifier
return identifier