EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit 0c231dc6 authored by Nikola Jankovic's avatar Nikola Jankovic :computer:
Browse files

Merge branch 'harvester-ext' into 'harvester'

Streamlining harvester endpoint

See merge request !93
parents fa082804 767300e1
No related branches found
No related tags found
2 merge requests!94implementation of harvester & scheduler,!93Streamlining harvester endpoint
import logging import logging
from typing import List, Optional, Tuple, Type
from dataclasses import dataclass
import requests import requests
import lxml.etree as ET import lxml.etree as ET
import pystac
from .Endpoint import Endpoint from .Endpoint import Endpoint
from ..stac import STACItemComposer from ..stac import STACItemComposer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SearchPage = Tuple[List[dict], Optional[str]]
@dataclass
class SearchPage:
records: List[dict]
index: int
total: int
class OpenSearchFormat:
mimetype = None
@classmethod
def from_config(cls, config: dict) -> Type['OpenSearchFormat']:
subclass_map = {
subclass.mimetype: subclass for subclass in cls.__subclasses__()
}
type_ = config.pop('type', None)
SubClass = subclass_map[type_]
return SubClass(**config)
class GeoJSONFormat(OpenSearchFormat):
mimetype = 'application/json'
def __init__(self, property_mapping):
self.property_mapping = property_mapping
def parse(self, response: requests.Response) -> SearchPage:
data = response.json()
features = [
self._parse_feature(feature) for feature in data['features']
]
return SearchPage(
features,
data['properties']['startIndex'],
data['properties']['totalResults'],
)
def _parse_feature(self, feature: dict) -> dict:
properties = {
property_name: feature['properties'][property_path]
for property_name, property_path in self.property_mapping.items()
}
return pystac.Item(
feature['id'],
geometry=feature['geometry'],
bbox=None,
datetime=None,
properties=properties,
).to_dict()
class AtomFormat(OpenSearchFormat):
mimetype = 'application/atom+xml'
def __init__(self):
# TODO: maybe add mapping from XML -> properties in output STAC
pass
class OpenSearchEndpoint(Endpoint): class OpenSearchEndpoint(Endpoint):
type = 'OpenSearch' type = 'OpenSearch'
...@@ -15,18 +84,19 @@ class OpenSearchEndpoint(Endpoint): ...@@ -15,18 +84,19 @@ class OpenSearchEndpoint(Endpoint):
'': 'http://www.w3.org/2005/Atom', '': 'http://www.w3.org/2005/Atom',
'opensearch': 'http://a9.com/-/spec/opensearch/1.1/', 'opensearch': 'http://a9.com/-/spec/opensearch/1.1/',
'parameters': 'http://a9.com/-/spec/opensearch/extensions/parameters/1.0/', 'parameters': 'http://a9.com/-/spec/opensearch/extensions/parameters/1.0/',
'georss':'http://www.georss.org/georss/', 'georss': 'http://www.georss.org/georss/',
'media':'http://search.yahoo.com/mrss/', 'media': 'http://search.yahoo.com/mrss/',
'owc':'http://www.opengis.net/owc/1.0/', 'owc': 'http://www.opengis.net/owc/1.0/',
'eo':'http://a9.com/-/opensearch/extensions/eo/1.0/', 'eo': 'http://a9.com/-/opensearch/extensions/eo/1.0/',
'geo':'http://a9.com/-/opensearch/extensions/geo/1.0/', 'geo': 'http://a9.com/-/opensearch/extensions/geo/1.0/',
'time':'http://a9.com/-/opensearch/extensions/time/1.0/', 'time': 'http://a9.com/-/opensearch/extensions/time/1.0/',
'cql':'http://a9.com/-/opensearch/extensions/cql/1.0/', 'cql': 'http://a9.com/-/opensearch/extensions/cql/1.0/',
'dc': 'http://purl.org/dc/elements/1.1/', 'dc': 'http://purl.org/dc/elements/1.1/',
} }
def __init__(self, *args, **kwargs): def __init__(self, format_config, *args, **kwargs):
super(OpenSearchEndpoint, self).__init__(*args, **kwargs) super(OpenSearchEndpoint, self).__init__(*args, **kwargs)
self.format = OpenSearchFormat.from_config(format_config)
def harvest(self) -> list: def harvest(self) -> list:
logger.info("Starting OpenSearch harvesting") logger.info("Starting OpenSearch harvesting")
...@@ -35,63 +105,21 @@ class OpenSearchEndpoint(Endpoint): ...@@ -35,63 +105,21 @@ class OpenSearchEndpoint(Endpoint):
parser = ET.XMLParser(recover=True) parser = ET.XMLParser(recover=True)
data = ET.fromstring(requests.get(self.url).content, parser) data = ET.fromstring(requests.get(self.url).content, parser)
urls = self._get_urls_params(data) urls = self._get_urls_params(data)
_result = [] result = []
# query url, search_params, (index, index_val), _ = self.query.opensearch_params(
try: urls, self.format.mimetype
# try to query the json endpoint )
url, search_params, (index, index_val), stac_properties = self.query.opensearch_params(urls, 'application/json')
response_json = requests.get(url, params=search_params) while True:
response_json.raise_for_status() response = requests.get(url, params=search_params)
data = response_json.json() response.raise_for_status()
search_params[index] += index_val
items_per_page = data['properties']['itemsPerPage'] page = self.format.parse(response)
while items_per_page: if page.records:
_result += data['features'] result.extend(page.records)
search_params[index] += index_val else:
response_json = requests.get(url, params=search_params) break
response_json.raise_for_status()
data = response_json.json()
items_per_page = data['properties']['itemsPerPage']
# format as stac
result = []
for i in _result:
item = STACItemComposer.from_opensearch(i, stac_properties=stac_properties)
result.append(item.to_dict())
except (requests.exceptions.HTTPError, KeyError) as err:
logger.error('Error raised for JSON endpoint, trying atom')
logger.exception(err)
try:
# try to query atom endpoint
query_url, search_params, (index, index_val), stac_properties = self.query.opensearch_params(urls, 'application/atom+xml')
collections_result = requests.get(query_url, params=search_params)
collections_data = ET.fromstring(collections_result.content, parser)
items_per_page = int(collections_data.find('opensearch:itemsPerPage', self.NS).text)
while items_per_page:
_result += collections_data.findall('entry', self.NS)
search_params[index] += index_val
collections_result = requests.get(query_url, params=search_params).content
collections_data = ET.fromstring(collections_result, parser)
items_per_page = int(collections_data.find('opensearch:itemsPerPage', self.NS).text)
# format as stac
result = []
for i in _result:
item = STACItemComposer.from_opensearch(i, namespace=self.NS)
result.append(item.to_dict())
except requests.exceptions.HTTPError:
logger.error('Http error raised for ATOM endpoint')
raise
# filter data
if self.filter:
result = self.filter_data(result)
return result return result
......
...@@ -9,10 +9,13 @@ from .exceptions import HarvestError ...@@ -9,10 +9,13 @@ from .exceptions import HarvestError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Harvester: class Harvester:
def __init__(self, name: str, endpoint: dict, queue: str): def __init__(self, name: str, endpoint: dict, queue: str,
mode: str = 'item', extract_property: str = None):
self.name = name self.name = name
self.endpoint: Type[Endpoint] = Endpoint.from_config(endpoint) self.endpoint: Type[Endpoint] = Endpoint.from_config(endpoint)
self.queue = queue self.queue = queue
self.mode = mode
self.extract_property = extract_property
@classmethod @classmethod
def from_config(cls, config: dict, value: str) -> 'Harvester': def from_config(cls, config: dict, value: str) -> 'Harvester':
...@@ -46,9 +49,15 @@ def main(config: dict, value: str, client: Redis=None, **kwargs): ...@@ -46,9 +49,15 @@ def main(config: dict, value: str, client: Redis=None, **kwargs):
# Perform harvest # Perform harvest
result = harvester.endpoint.harvest() result = harvester.endpoint.harvest()
# Stringify items encoded = None
for idx, item in enumerate(result): if harvester.mode == 'item':
result[idx] = json.dumps(item) # Stringify items
encoded = [json.dumps(item) for item in result]
elif harvester.mode == 'property':
encoded = [
item['properties'][harvester.extract_property]
for item in result
]
# Send to queue # Send to queue
if not client: if not client:
...@@ -56,5 +65,5 @@ def main(config: dict, value: str, client: Redis=None, **kwargs): ...@@ -56,5 +65,5 @@ def main(config: dict, value: str, client: Redis=None, **kwargs):
host = redis_cfg['host'] host = redis_cfg['host']
port = redis_cfg['port'] port = redis_cfg['port']
client = Redis(host=host, port=port, charset="utf-8", decode_responses=True) client = Redis(host=host, port=port, charset="utf-8", decode_responses=True)
client.lpush(harvester.queue, *result) client.lpush(harvester.queue, *encoded)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment