EOX GitLab Instance

Commit 0835290b authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Using new harvester initialization

parent a70884cc
Pipeline #19484 failed with stage
in 2 minutes and 3 seconds
......@@ -6,6 +6,7 @@ Contains command line interface
import logging.config
import json
import click
......@@ -65,6 +66,7 @@ def daemon(
@cli.command(help="Run a single, one-off harvest")
@click.argument("harvester_name", type=str)
@click.option("--values", type=str, default=None)
@click.option("--host", type=str, required=True)
@click.option("--port", type=int, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
......@@ -72,6 +74,7 @@ def daemon(
@click.option("--debug/--no-debug", default=False)
def harvest(
harvester_name: str,
values: str,
host: str,
port: int,
config_file: str,
......@@ -84,7 +87,11 @@ def harvest(
client = init_client(host, port)
main(config, harvester_name, client)
item = {"name", harvester_name}
if values:
item["values"] = json.loads(values)
main(config, item, client)
if __name__ == "__main__":
......@@ -7,6 +7,8 @@ Contains functionality related to running the daemon
import logging
import json
from redis import Redis
from .harvester import main
......@@ -28,6 +30,10 @@ def run_daemon(config: dict, client: Redis, listen_queue: str):
while True:
# fetch an item from the queue to be harvested
_, value = client.brpop(listen_queue)
_, raw_item = client.brpop(listen_queue)
# parse the item
item = json.loads(raw_item)
# start the harvesting
main(config, value, client)
main(config, item, client)
import json
import logging
from typing import Iterator, Optional
from typing import Union
from redis import Redis
from mergedeep import merge
from harvester.resource import Resource
from .resource import Resource
from .endpoint import get_endpoint
from .source import get_source
from .exceptions import HarvestError
from .utils import cql_filter
logger = logging.getLogger(__name__)
def stringify(
result: Iterator[dict],
mode: str = "item",
extract_property: Optional[str] = None,
) -> Iterator[str]:
if mode == "item":
yield from (json.dumps(item, default=str) for item in result)
elif mode == "property":
yield from (item["properties"][extract_property] for item in result)
logger = logging.getLogger(__name__)
def init_resource(harvest_config: dict) -> Resource:
......@@ -37,31 +27,46 @@ def init_resource(harvest_config: dict) -> Resource:
raise HarvestError(f"Resource type {config['type']} not found")
def main(config: dict, value: str, client: Redis):
logger.info("running harvesting for %s", value)
def get_harvest_config(config: dict, name: str, values: dict) -> dict:
Selects the harvester config for the given name.
Harvesters configuration can either be a list of dicts (with name property)
or a dict mapping name to harvester.
harvesters = config["harvesters"]
if isinstance(harvesters, dict):
return harvesters[name]
for harvest_config in harvesters:
if harvest_config["name"] == name:
return merge({}, harvest_config, values)
raise KeyError(name)
def main(config: dict, item: Union[dict, str], client: Redis):
if isinstance(item, str):
name = item
values = {}
name = item["name"]
values = item.get("values", {})
logger.info(f"Running harvesting for {name}")
# get the harvest config
harvest_config = get_harvest_config(config, name, values)
# Initialize resource
harvest_config = next(
item for item in config["harvesters"] if item["name"] == value
except StopIteration:
harvest_config = json.loads(value)
resource = init_resource(harvest_config)
# Perform harvest
result = resource.harvest()
# Filter data
if "filter" in harvest_config:
result = cql_filter(harvest_config["filter"], result)
# Stringify items
encoded = stringify(
harvest_config.get("mode", "item"),
# Send to queue
for value in encoded:
client.lpush(harvest_config["queue"], value)
for item in result:
client.lpush(harvest_config["queue"], json.dumps(item, default=str))
......@@ -3,7 +3,7 @@ import requests_mock
import pytest_mock
import unittest.mock
from harvester.harvester import main
from harvester.harvester import main, get_harvest_config
@pytest.mark.parametrize("value", [("S2L2A_Element84")])
......@@ -53,3 +53,30 @@ def test_s3(
main(config, value, client)
def test_get_harvester_config():
# testing list style
test_config = {
"harvesters": [
"name": "myname",
"type": "TestType",
"properties": {}
config = get_harvest_config(test_config, "myname", {})
assert config == test_config["harvesters"][0]
# testing list style
test_config = {
"harvesters": {
"myname": {
"type": "TestType",
"properties": {}
config = get_harvest_config(test_config, "myname", {})
assert config == test_config["harvesters"]["myname"]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment