EOX GitLab Instance

harvester.py 1.66 KB
Newer Older
1
2
import json
import logging
Fabian Schindler's avatar
Fabian Schindler committed
3
from typing import Iterator, Optional
Nikola Jankovic's avatar
Nikola Jankovic committed
4

5
6
from redis import Redis

Nikola Jankovic's avatar
Nikola Jankovic committed
7
8
9
10
from harvester.resource import Resource

from .endpoint import get_endpoint
from .source import get_source
11
from .exceptions import HarvestError
Nikola Jankovic's avatar
Nikola Jankovic committed
12
from .utils import cql_filter
13
14
15

logger = logging.getLogger(__name__)

Nikola Jankovic's avatar
Nikola Jankovic committed
16
17

def stringify(
18
    result: Iterator[dict],
19
20
    mode: str = "item",
    extract_property: Optional[str] = None,
Fabian Schindler's avatar
Fabian Schindler committed
21
) -> Iterator[str]:
Nikola Jankovic's avatar
Nikola Jankovic committed
22
    if mode == "item":
23
        yield from (json.dumps(item, default=str) for item in result)
Nikola Jankovic's avatar
Nikola Jankovic committed
24
    elif mode == "property":
25
        yield from (item["properties"][extract_property] for item in result)
Nikola Jankovic's avatar
Nikola Jankovic committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43


def init_resource(harvest_config: dict) -> Resource:
    config: dict = harvest_config.pop("resource")

    if endpoint := get_endpoint(config):
        return endpoint

    if source := get_source(config):
        return source

    raise HarvestError(f"Resource type {config['type']} not found")


def main(config: dict, value: str, client: Redis):
    logger.info("running harvesting for %s", value)

    # Initialize resource
44
    try:
Nikola Jankovic's avatar
Nikola Jankovic committed
45
46
47
48
49
50
        harvest_config = next(
            item for item in config["harvesters"] if item["name"] == value
        )
    except StopIteration:
        harvest_config = json.loads(value)
    resource = init_resource(harvest_config)
51
52

    # Perform harvest
Nikola Jankovic's avatar
Nikola Jankovic committed
53
    result = resource.harvest()
54

Nikola Jankovic's avatar
Nikola Jankovic committed
55
56
57
58
59
60
61
62
63
    # Filter data
    result = cql_filter(harvest_config["filter"], result)

    # Stringify items
    encoded = stringify(
        result,
        harvest_config.get("mode", "item"),
        harvest_config.get("extract_property"),
    )
64

Nikola Jankovic's avatar
Nikola Jankovic committed
65
    # Send to queue
66
67
    for value in encoded:
        client.lpush(harvest_config["queue"], value)