EOX GitLab Instance

Commit 74f9e83c authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Merge branch main

parent aa7054d4
Pipeline #19525 failed with stage
in 41 seconds
......@@ -6,6 +6,7 @@ Contains command line interface
"""
import logging.config
import json
import click
......@@ -65,6 +66,7 @@ def daemon(
@cli.command(help="Run a single, one-off harvest")
@click.argument("harvester_name", type=str)
@click.option("--values", type=str, default=None)
@click.option("--host", type=str, required=True)
@click.option("--port", type=int, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
......@@ -72,6 +74,7 @@ def daemon(
@click.option("--debug/--no-debug", default=False)
def harvest(
harvester_name: str,
values: str,
host: str,
port: int,
config_file: str,
......@@ -84,7 +87,11 @@ def harvest(
validate_config(config)
client = init_client(host, port)
main(config, harvester_name, client)
item = {"name": harvester_name}
if values:
item["values"] = json.loads(values)
main(config, item, client)
if __name__ == "__main__":
......
......@@ -7,6 +7,8 @@ Contains functionality related to running the daemon
"""
import logging
import json
from redis import Redis
from .harvester import main
......@@ -28,6 +30,10 @@ def run_daemon(config: dict, client: Redis, listen_queue: str):
while True:
# fetch an item from the queue to be harvested
_, value = client.brpop(listen_queue)
_, raw_item = client.brpop(listen_queue)
# parse the item
item = json.loads(raw_item)
# start the harvesting
main(config, value, client)
main(config, item, client)
import json
import logging
from typing import Iterator, Optional
from typing import Union
from redis import Redis
from mergedeep import merge
from .abc import Resource
from .endpoint import get_endpoint
......@@ -13,17 +14,6 @@ from .postprocess import get_postprocessor
logger = logging.getLogger(__name__)
def stringify(
result: Iterator[dict],
mode: str = "item",
extract_property: Optional[str] = None,
) -> Iterator[str]:
if mode == "item":
yield from (json.dumps(item, default=str) for item in result)
elif mode == "property":
yield from (item["properties"][extract_property] for item in result)
def init_resource(harvest_config: dict) -> Resource:
config: dict = harvest_config.pop("resource")
......@@ -36,16 +26,37 @@ def init_resource(harvest_config: dict) -> Resource:
raise HarvestError(f"Resource type {config['type']} not found")
def main(config: dict, value: str, client: Redis):
logger.info("running harvesting for %s", value)
def get_harvest_config(config: dict, name: str, values: dict) -> dict:
"""
Selects the harvester config for the given name.
Harvesters configuration can either be a list of dicts (with name property)
or a dict mapping name to harvester.
"""
harvesters = config["harvesters"]
if isinstance(harvesters, dict):
return harvesters[name]
else:
for harvest_config in harvesters:
if harvest_config["name"] == name:
return merge({}, harvest_config, values)
else:
raise KeyError(name)
def main(config: dict, item: Union[dict, str], client: Redis):
if isinstance(item, str):
name = item
values = {}
else:
name = item["name"]
values = item.get("values", {})
logger.info(f"Running harvesting for {name}")
# get the harvest config
harvest_config = get_harvest_config(config, name, values)
# Initialize resource
try:
harvest_config = next(
item for item in config["harvesters"] if item["name"] == value
)
except StopIteration:
harvest_config = json.loads(value)
resource = init_resource(harvest_config)
# Perform harvest
......@@ -59,15 +70,9 @@ def main(config: dict, value: str, client: Redis):
)
# Filter data
result = cql_filter(harvest_config["filter"], result)
# Stringify items
encoded = stringify(
result,
harvest_config.get("mode", "item"),
harvest_config.get("extract_property"),
)
if "filter" in harvest_config:
result = cql_filter(harvest_config["filter"], result)
# Send to queue
for value in encoded:
client.lpush(harvest_config["queue"], value)
for item in result:
client.lpush(harvest_config["queue"], json.dumps(item, default=str))
......@@ -3,7 +3,7 @@ import requests_mock
import pytest_mock
import unittest.mock
from harvester.harvester import main
from harvester.harvester import main, get_harvest_config
@pytest.mark.parametrize("value", [("S2L2A_Element84")])
......@@ -53,3 +53,30 @@ def test_s3(
main(config, value, client)
client.lpush.assert_called()
def test_get_harvester_config():
# testing list style
test_config = {
"harvesters": [
{
"name": "myname",
"type": "TestType",
"properties": {}
}
]
}
config = get_harvest_config(test_config, "myname", {})
assert config == test_config["harvesters"][0]
# testing list style
test_config = {
"harvesters": {
"myname": {
"type": "TestType",
"properties": {}
}
}
}
config = get_harvest_config(test_config, "myname", {})
assert config == test_config["harvesters"]["myname"]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment