EOX GitLab Instance

Commit 8b45e89b authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

implemented generators for better data streaming

parent 88aa7553
Pipeline #19049 failed with stage
in 42 seconds
...@@ -37,14 +37,6 @@ properties: ...@@ -37,14 +37,6 @@ properties:
- Swift - Swift
- OGCAPI - OGCAPI
- OADS - OADS
time_property:
description: what time to extract from queried results.
type: string
enum:
- sensed
- updated
- created
- modified
bbox: bbox:
description: Bounding box to be queried description: Bounding box to be queried
type: string type: string
......
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
import logging import logging
from typing import Any, Dict, List from typing import Any, Dict, Generator, List
from dataclasses import dataclass, field from dataclasses import dataclass, field
import requests import requests
...@@ -127,13 +127,12 @@ class OpenSearchEndpoint(Endpoint): ...@@ -127,13 +127,12 @@ class OpenSearchEndpoint(Endpoint):
self.format = get_format(format_config) self.format = get_format(format_config)
self.query = OpenSearchQuery(**query) self.query = OpenSearchQuery(**query)
def harvest(self) -> list: def harvest(self) -> Generator[dict, None, None]:
logger.info("Starting OpenSearch harvesting") logger.info("Starting OpenSearch harvesting")
parser = ET.XMLParser(recover=True) parser = ET.XMLParser(recover=True)
data = ET.fromstring(requests.get(self.url).content, parser) data = ET.fromstring(requests.get(self.url).content, parser)
urls = self._get_url_param_mapping(data) urls = self._get_url_param_mapping(data)
result = []
params = self.query.prepare_params(urls=urls, mimetype=self.format.mimetype) params = self.query.prepare_params(urls=urls, mimetype=self.format.mimetype)
...@@ -148,12 +147,10 @@ class OpenSearchEndpoint(Endpoint): ...@@ -148,12 +147,10 @@ class OpenSearchEndpoint(Endpoint):
search_params[index_keyword] += index_value search_params[index_keyword] += index_value
page = self.format.parse(response) page = self.format.parse(response)
if page.records: if page.records:
result.extend(page.records) yield from page.records
else: else:
break break
return result
def _get_url_param_mapping(self, data: ET._Element) -> dict: def _get_url_param_mapping(self, data: ET._Element) -> dict:
urls: Dict[str, Any] = {} urls: Dict[str, Any] = {}
for url in data.findall("opensearch:Url", self.NS): for url in data.findall("opensearch:Url", self.NS):
......
...@@ -10,17 +10,16 @@ class Query(ABC): ...@@ -10,17 +10,16 @@ class Query(ABC):
bbox: str = None, bbox: str = None,
collection: str = None, collection: str = None,
): ):
self.time_begin, self.time_end, self.time_property = self._time(time) self.time_begin, self.time_end = self._time(time)
self.bbox = bbox self.bbox = bbox
self.collection = collection self.collection = collection
@staticmethod @staticmethod
def _time(time: Dict[str, Any]) -> Tuple[datetime, datetime, str]: def _time(time: Dict[str, Any]) -> Tuple[datetime, datetime]:
time_begin = time.pop("begin", datetime.now(timezone.utc)) time_begin = time.pop("begin", datetime.now(timezone.utc))
time_end = time.pop("end", datetime.now(timezone.utc)) time_end = time.pop("end", datetime.now(timezone.utc))
time_property = time.pop("property", None) return time_begin, time_end
return time_begin, time_end, time_property
@abstractmethod @abstractmethod
def prepare_params(self, *args, **kwargs): def prepare_params(self, *args, **kwargs):
......
import logging import logging
from typing import Generator
import requests import requests
...@@ -15,28 +16,25 @@ class STACAPIEndpoint(Endpoint): ...@@ -15,28 +16,25 @@ class STACAPIEndpoint(Endpoint):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.query = StacAPIQuery(**query) self.query = StacAPIQuery(**query)
def harvest(self) -> list: def harvest(self) -> Generator[dict, None, None]:
logger.info("Starting STACAPI harvesting") logger.info("Starting STACAPI harvesting")
main = requests.get(self.url).json() main = requests.get(self.url).json()
search_url = next( search_url = next(
link["href"] for link in main["links"] if link["rel"] == "search" link["href"] for link in main["links"] if link["rel"] == "search"
) )
search_params = self.query.prepare_params(root_response=main) search_params = self.query.prepare_params(root_response=main)
result = []
logger.debug("querying %s with %s", search_url, search_params) logger.debug("querying %s with %s", search_url, search_params)
query_data = requests.get(search_url, params=search_params).json() query_data = requests.get(search_url, params=search_params).json()
context = query_data["context"] context = query_data["context"]
while context["returned"]: while context["returned"]:
for feature in query_data["features"]: for feature in query_data["features"]:
result.append(feature) yield feature
search_params["page"] += 1 search_params["page"] += 1
logger.debug("querying %s with %s", search_url, search_params) logger.debug("querying %s with %s", search_url, search_params)
query_data = requests.get(search_url, params=search_params).json() query_data = requests.get(search_url, params=search_params).json()
context = query_data["context"] context = query_data["context"]
return result
class StacAPIQuery(Query): class StacAPIQuery(Query):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
......
import json import json
import logging import logging
from typing import List, Optional from typing import Generator, Optional
from redis import Redis from redis import Redis
...@@ -15,15 +15,14 @@ logger = logging.getLogger(__name__) ...@@ -15,15 +15,14 @@ logger = logging.getLogger(__name__)
def stringify( def stringify(
result: List[dict], mode: str = "item", extract_property: Optional[str] = None result: Generator[dict, None, None],
): mode: str = "item",
encoded: List[str] = [] extract_property: Optional[str] = None,
) -> Generator[str, None, None]:
if mode == "item": if mode == "item":
encoded.extend((json.dumps(item, default=str) for item in result)) yield from (json.dumps(item, default=str) for item in result)
elif mode == "property": elif mode == "property":
encoded.extend((item["properties"][extract_property] for item in result)) yield from (item["properties"][extract_property] for item in result)
return encoded
def init_resource(harvest_config: dict) -> Resource: def init_resource(harvest_config: dict) -> Resource:
...@@ -64,4 +63,5 @@ def main(config: dict, value: str, client: Redis): ...@@ -64,4 +63,5 @@ def main(config: dict, value: str, client: Redis):
) )
# Send to queue # Send to queue
client.lpush(harvest_config["queue"], *encoded) for value in encoded:
client.lpush(harvest_config["queue"], value)
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
from typing import List from typing import Generator
class Resource(ABC): class Resource(ABC):
...@@ -12,9 +12,8 @@ class Resource(ABC): ...@@ -12,9 +12,8 @@ class Resource(ABC):
# 1. prepare harvest # 1. prepare harvest
# 2. harvest resource # 2. harvest resource
# 3. convert to stac items # 3. convert to stac items
# 4. filter data if necessary # 4. return list of stac items as dictionaries
# 5. return list of stac items as dictionaries
@abstractmethod @abstractmethod
def harvest(self) -> List[dict]: def harvest(self) -> Generator[dict, None, None]:
pass pass
import re import re
import logging import logging
from dateutil.parser import isoparse from dateutil.parser import isoparse
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Generator
if TYPE_CHECKING: if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client from mypy_boto3_s3.client import S3Client
...@@ -39,12 +39,11 @@ class S3Source(Source): ...@@ -39,12 +39,11 @@ class S3Source(Source):
bucket = self.parameters["url"].strip("https://").split(".")[0] bucket = self.parameters["url"].strip("https://").split(".")[0]
return bucket return bucket
def harvest(self) -> list: def harvest(self) -> Generator[dict, None, None]:
logger.info("Starting S3 harvesting") logger.info("Starting S3 harvesting")
paginator = self.client.get_paginator("list_objects_v2") paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket, Prefix=self.parameters["prefix"]) pages = paginator.paginate(Bucket=self.bucket, Prefix=self.parameters["prefix"])
result = []
time_regex: str = self.parameters["time_regex"] time_regex: str = self.parameters["time_regex"]
for page in pages: for page in pages:
...@@ -52,9 +51,7 @@ class S3Source(Source): ...@@ -52,9 +51,7 @@ class S3Source(Source):
if match := re.search(time_regex, file["Key"]): if match := re.search(time_regex, file["Key"]):
dt = isoparse(match[0]) dt = isoparse(match[0])
item = self._create_item(file, dt, self.parameters["url"]) item = self._create_item(file, dt, self.parameters["url"])
result.append(item.to_dict()) yield item.to_dict()
return result
def _create_item(self, data, dt, url): def _create_item(self, data, dt, url):
identifier = dt.strftime("%Y%m%d_%H%M%S") identifier = dt.strftime("%Y%m%d_%H%M%S")
......
from typing import List from typing import Generator
from pygeofilter.backends.native.evaluate import NativeEvaluator from pygeofilter.backends.native.evaluate import NativeEvaluator
from pygeofilter.parsers.cql_json import parse as json_parse from pygeofilter.parsers.cql_json import parse as json_parse
def cql_filter(_filter: dict, data: List[dict]) -> List[dict]: def cql_filter(
_filter: dict, data: Generator[dict, None, None]
) -> Generator[dict, None, None]:
if not _filter: if not _filter:
return data yield from data
return
_filter = json_parse(_filter) _filter = json_parse(_filter)
attr_map = {"point_attr": "geometry", "*": "properties.*"} attr_map = {"point_attr": "geometry", "*": "properties.*"}
nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False) nat_eval = NativeEvaluator(attribute_map=attr_map, use_getattr=False)
evaluator = nat_eval.evaluate(_filter) evaluator = nat_eval.evaluate(_filter)
data = list(filter(evaluator, data)) yield from filter(evaluator, data)
return data
...@@ -48,11 +48,11 @@ def data_map(): ...@@ -48,11 +48,11 @@ def data_map():
json_load(DATA_PATH / "stac_e84_root.json"), json_load(DATA_PATH / "stac_e84_root.json"),
), ),
( (
"""https://earth-search.aws.element84.com/v0/search?datetime=2019-09-10T00%3A00%3A00%2B00%3A00%2F2019-09-11T00%3A00%3A00%2B00%3A00&limit=100&page=1&bbox=%5B14.9%2C47.7%2C16.4%2C48.7%5D&collections=%5B%22sentinel-s2-l2a-cogs%22%5D""", """https://earth-search.aws.element84.com/v0/search?datetime=2020-09-26T00%3A00%3A00%2B00%3A00%2F2020-09-27T00%3A00%3A00%2B00%3A00&limit=100&page=1&bbox=%5B14.9%2C47.7%2C16.4%2C48.7%5D&collections=%5B%22sentinel-s2-l2a-cogs%22%5D""",
json_load(DATA_PATH / "stac_e84_data.json"), json_load(DATA_PATH / "stac_e84_data.json"),
), ),
( (
"https://earth-search.aws.element84.com/v0/search?datetime=2019-09-10T00%3A00%3A00%2B00%3A00%2F2019-09-11T00%3A00%3A00%2B00%3A00&limit=100&page=2&bbox=%5B14.9%2C47.7%2C16.4%2C48.7%5D&collections=%5B%22sentinel-s2-l2a-cogs%22%5D", "https://earth-search.aws.element84.com/v0/search?datetime=2020-09-26T00%3A00%3A00%2B00%3A00%2F2020-09-27T00%3A00%3A00%2B00%3A00&limit=100&page=2&bbox=%5B14.9%2C47.7%2C16.4%2C48.7%5D&collections=%5B%22sentinel-s2-l2a-cogs%22%5D",
json_load(DATA_PATH / "stac_e84_empty.json"), json_load(DATA_PATH / "stac_e84_empty.json"),
), ),
], ],
......
...@@ -5,17 +5,11 @@ harvesters: ...@@ -5,17 +5,11 @@ harvesters:
type: STACAPI # STACAPI, STACCatalog, OpenSearch, FTP, S3, Swift, OGCAPI, OADS type: STACAPI # STACAPI, STACCatalog, OpenSearch, FTP, S3, Swift, OGCAPI, OADS
query: query:
time: time:
begin: 2019-09-10T00:00:00+00:00 begin: 2020-09-26T00:00:00+00:00
end: 2019-09-11T00:00:00+00:00 end: 2020-09-27T00:00:00+00:00
property: sensed
collection: sentinel-s2-l2a-cogs collection: sentinel-s2-l2a-cogs
bbox: 14.9,47.7,16.4,48.7 bbox: 14.9,47.7,16.4,48.7
filter: filter: {}
and:
- during:
- property: updated
- - P5D
- !now
queue: register # register, ingest, delete, update, preprocess queue: register # register, ingest, delete, update, preprocess
mode: item # item, property mode: item # item, property
extract_property: null extract_property: null
...@@ -40,7 +34,6 @@ harvesters: ...@@ -40,7 +34,6 @@ harvesters:
type: OpenSearch type: OpenSearch
query: query:
time: time:
property: sensed # sensed eox:updated
begin: 2019-09-10T00:00:00+00:00 begin: 2019-09-10T00:00:00+00:00
end: 2019-09-11T00:00:00+00:00 end: 2019-09-11T00:00:00+00:00
bbox: 14.9,47.7,16.4,48.7 bbox: 14.9,47.7,16.4,48.7
...@@ -52,7 +45,6 @@ harvesters: ...@@ -52,7 +45,6 @@ harvesters:
type: OpenSearch type: OpenSearch
query: query:
time: time:
property: sensed
begin: 2019-09-10T00:00:00+00:00 begin: 2019-09-10T00:00:00+00:00
end: 2019-09-11T00:00:00+00:00 end: 2019-09-11T00:00:00+00:00
collection: null collection: null
......
...@@ -19,7 +19,7 @@ def test_stacapi( ...@@ -19,7 +19,7 @@ def test_stacapi(
requests_mocker.get(url=url, json=mock_json) requests_mocker.get(url=url, json=mock_json)
main(config, value, client) main(config, value, client)
client.lpush.assert_called_once() client.lpush.assert_called()
@pytest.mark.parametrize("value", [("Creodias-Opensearch")]) @pytest.mark.parametrize("value", [("Creodias-Opensearch")])
...@@ -36,7 +36,7 @@ def test_opensearch( ...@@ -36,7 +36,7 @@ def test_opensearch(
requests_mocker.get(url=mock_data["empty"][0], json=mock_data["empty"][1]) requests_mocker.get(url=mock_data["empty"][0], json=mock_data["empty"][1])
main(config, value, client) main(config, value, client)
client.lpush.assert_called_once() client.lpush.assert_called()
@pytest.mark.parametrize("value", [("Fusion-data")]) @pytest.mark.parametrize("value", [("Fusion-data")])
...@@ -52,4 +52,4 @@ def test_s3( ...@@ -52,4 +52,4 @@ def test_s3(
session_mock().client().get_paginator().paginate.return_value = mock_data session_mock().client().get_paginator().paginate.return_value = mock_data
main(config, value, client) main(config, value, client)
client.lpush.assert_called_once() client.lpush.assert_called()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment