EOX GitLab Instance

Commit 6dcd0032 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Reorganizing resources.

Sources are now only FS abstractions
New FileSchemes operate on Sources
parent a70884cc
Pipeline #19425 failed with stage
in 46 seconds
from abc import abstractmethod, ABC
from typing import Iterator
from dataclasses import dataclass
from datetime import datetime
from typing import IO, AnyStr, Iterator, List
class Resource(ABC):
......@@ -8,12 +10,32 @@ class Resource(ABC):
that provides data or metadata.
"""
# All resources should implement the interface to do following
# 1. prepare harvest
# 2. harvest resource
# 3. convert to stac items
# 4. return list of stac items as dictionaries
@abstractmethod
def harvest(self) -> Iterator[dict]:
pass
...
@dataclass
class Stat:
mtime: datetime
size: int
class Source(ABC):
@abstractmethod
def open(self, path: str) -> IO[AnyStr]:
...
@abstractmethod
def listdir(self, path) -> List[str]:
...
class Endpoint(Resource):
def __init__(self, url: str):
self.url = url
class FileScheme(Resource):
def __init__(self, source: Source):
self.source = source
from ..resource import Resource
class Endpoint(Resource):
type: str = "Endpoint"
def __init__(self, url: str, *args, **kwargs):
self.url = url
from typing import Iterator
from ._endpoint import Endpoint
from ..abc import Endpoint
class OADSEndpoint(Endpoint):
......
from typing import Iterator
from ._endpoint import Endpoint
from ..abc import Endpoint
class OGCAPIEndpoint(Endpoint):
......
......@@ -7,7 +7,7 @@ import requests
import lxml.etree as ET
import pystac
from ._endpoint import Endpoint
from ..abc import Endpoint
from .query import Query
from ..exceptions import QueryError
......
......@@ -3,7 +3,7 @@ from typing import Iterator
import requests
from ._endpoint import Endpoint
from ..abc import Endpoint
from .query import Query
logger = logging.getLogger(__name__)
......
import logging
import re
from typing import Iterator
from dateutil.parser import isoparse
import pystac
from ..abc import FileScheme, Source
logger = logging.getLogger(__name__)
class FileMatcherScheme(FileScheme):
def __init__(self, source: Source, path: str, time_regex: str, url: str):
super().__init__(source)
self.path = path
self.time_regex = re.compile(time_regex)
self.url = url
def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 harvesting")
for filename in self.source.listdir(self.path):
if match := re.search(self.time_regex, filename):
dt = isoparse(match[0])
item = self._create_item(filename, dt)
yield item.to_dict()
def _create_item(self, path, dt):
identifier = dt.strftime("%Y%m%d_%H%M%S")
properties = {
"datetime": dt,
"updated": self.source.stat(path).mtime,
}
item = pystac.Item(
id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties
)
item.add_asset(identifier, pystac.Asset(f"{self.url}{path}"))
return item
import logging
import json
from os.path import join
from typing import Iterator
from urllib.parse import urlparse, urljoin
from ..abc import FileScheme, Source
logger = logging.getLogger(__name__)
class STACCatalogScheme(FileScheme):
def __init__(self, source: Source, root_path: str):
super().__init__(source)
self.root_path = root_path
def _read_json(self, path):
return json.load(self.source.open(path))
def _join_href(self, path: str, href: str) -> str:
"""
Joins the given href with a previous bucket/key. When we have a fully
qualified S3 URL, the included bucket/key pair is returned.
If href is a relative path, it is joined with the previous key.
"""
parsed = urlparse(href)
return urljoin(parsed._replace(path=join(path, parsed.path)))
def harvest(self) -> Iterator[dict]:
yield from self.harvest_catalog(self.root_path)
def harvest_catalog(self, path: str) -> Iterator[dict]:
"""
Harvests a specified STAC catalog. Will recurse into child catalogs
and yield all included STAC items.
"""
logger.info(f"Harvesting from catalog {path}")
catalog = self._read_json(path)
for link in catalog["links"]:
if link["rel"] == "item":
item_href = self._join_href(path, link["href"])
logger.info(f"Harvested item {item_href}")
yield self._read_json(item_href)
elif link["rel"] == "child":
catalog_href = self._join_href(path, link["href"])
yield from self.harvest_catalog(catalog_href)
......@@ -4,8 +4,7 @@ from typing import Iterator, Optional
from redis import Redis
from harvester.resource import Resource
from .abc import Resource
from .endpoint import get_endpoint
from .source import get_source
from .exceptions import HarvestError
......
from typing import Optional
from .stac_catalog import STACCatalogSource
from ..abc import Source
from .swift import SwiftSource
from .s3 import S3Source, S3CatalogSource
from .s3 import S3Source
from .ftp import FTPSource
from ._source import Source
SOURCE_MAP = {
"FTP": FTPSource,
"S3": S3Source,
"S3Catalog": S3CatalogSource,
"STACCatalog": STACCatalogSource,
"Swift": SwiftSource,
}
......
from typing import Dict
from ..resource import Resource
class Source(Resource):
type: str = "Source"
def __init__(self, parameters: Dict[str, str], *args, **kwargs):
self.parameters = parameters
import re
import logging
from dateutil.parser import isoparse
from typing import TYPE_CHECKING, Iterator, Tuple
from typing import TYPE_CHECKING, IO, AnyStr, List
from functools import cached_property
from urllib.parse import urlparse
import json
from os.path import dirname, join
if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
......@@ -16,16 +13,16 @@ import botocore.session
import botocore.handlers
from botocore import UNSIGNED
from botocore.config import Config
import pystac
from ._source import Source
from ..abc import Source, Stat
logger = logging.getLogger(__name__)
class S3Base:
class S3Source(Source):
def __init__(
self,
bucket: str,
secret_access_key: str = None,
access_key_id: str = None,
endpoint_url: str = "",
......@@ -34,6 +31,7 @@ class S3Base:
region_name: str = None,
public: bool = False,
):
self.bucket = bucket
self.secret_access_key = secret_access_key
self.access_key_id = access_key_id
self.endpoint_url = endpoint_url
......@@ -62,108 +60,28 @@ class S3Base:
)
return client
def get_bucket_and_key(self, path):
parsed = urlparse(path)
if parsed.scheme and parsed.scheme.lower() != "s3":
raise ValueError(f"Invalid S3 URL {path}")
class S3Source(Source):
type = "S3"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@property
def client(self) -> "S3Client":
botocore_session = botocore.session.Session()
session = boto3.session.Session(botocore_session=botocore_session)
if parsed.netloc:
return (parsed.netloc, parsed.path)
client = session.client(
"s3",
aws_access_key_id=self.parameters["access_key_id"],
aws_secret_access_key=self.parameters["secret_access_key"],
region_name=self.parameters["region"],
)
return client
return (self.bucket, path)
@property
def bucket(self) -> str:
bucket = self.parameters["url"].strip("https://").split(".")[0]
return bucket
def listdir(self, path) -> List[str]:
paginator = self.client.get_paginator("list_objects_v2")
bucket, prefix = self.get_bucket_and_key(path)
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 harvesting")
return [file["Key"] for page in pages for file in page["Contents"]]
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket, Prefix=self.parameters["prefix"])
time_regex: str = self.parameters["time_regex"]
for page in pages:
for file in page["Contents"]:
if match := re.search(time_regex, file["Key"]):
dt = isoparse(match[0])
item = self._create_item(file, dt, self.parameters["url"])
yield item.to_dict()
def _create_item(self, data, dt, url):
identifier = dt.strftime("%Y%m%d_%H%M%S")
properties = {
"datetime": dt,
"updated": data["LastModified"],
}
item = pystac.Item(
id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties
)
item.add_asset(identifier, pystac.Asset(f"{url}{data['Key']}"))
return item
class S3CatalogSource(S3Base, Source):
type = "S3Catalog"
def __init__(self, root_href: str, **kwargs):
super().__init__(**kwargs)
self.root_href = root_href
def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 Catalog harvesting")
parsed = urlparse(self.root_href)
yield from self.harvest_catalog(parsed.netloc, parsed.path)
def fetch_json(self, bucket: str, key: str) -> dict:
"""
Loads the given object identifier by bucket and key and loads it as
JSON.
"""
response = self.client.get_object(Bucket=bucket, Key=key)
return json.load(response["Body"])
def join_href(self, bucket: str, key: str, href: str) -> Tuple[str, str]:
"""
Joins the given href with a previous bucket/key. When we have a fully
qualified S3 URL, the included bucket/key pair is returned.
If href is a relative path, it is joined with the previous key.
"""
parsed = urlparse(href)
if parsed.netloc:
if parsed.scheme.lower() != "s3":
# TODO: what if HTTP hrefs?
raise ValueError("Can only join S3 URLs")
return (parsed.netloc, parsed.path)
else:
return (
bucket,
join(dirname(key), parsed.path),
)
def open(self, path: str) -> IO[AnyStr]:
bucket, key = self.get_bucket_and_key(path)
return self.client.get_object(Bucket=bucket, Key=key)["Body"]
def harvest_catalog(self, bucket: str, key: str) -> Iterator[dict]:
"""
Harvests a specified STAC catalog. Will recurse into child catalogs
and yield all included STAC items.
"""
logger.info(f"Harvesting from catalog {bucket}/{key}")
catalog = self.fetch_json(bucket, key)
for link in catalog["links"]:
if link["rel"] == "item":
item_bucket, item_key = self.join_href(bucket, key, link["href"])
logger.info(f"Harvested item {item_bucket}/{item_key}")
yield self.fetch_json(item_bucket, item_key)
elif link["rel"] == "child":
cat_bucket, cat_key = self.join_href(bucket, key, link["href"])
yield from self.harvest_catalog(cat_bucket, cat_key)
def stat(self, path: str) -> Stat:
bucket, key = self.get_bucket_and_key(path)
response = self.client.head_object(Bucket=bucket, Key=key)
return Stat(mtime=response["LastModified"], size=response["ContentLength"])
from typing import Iterator
from ._source import Source
class STACCatalogSource(Source):
type = "STACCatalog"
def harvest(self) -> Iterator[dict]:
raise NotImplementedError()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment