EOX GitLab Instance

harvester.py 1.66 KB
Newer Older
1
2
import json
import logging
Nikola Jankovic's avatar
Nikola Jankovic committed
3
4
from typing import List, Optional

5
6
from redis import Redis

Nikola Jankovic's avatar
Nikola Jankovic committed
7
8
9
10
from harvester.resource import Resource

from .endpoint import get_endpoint
from .source import get_source
11
from .exceptions import HarvestError
Nikola Jankovic's avatar
Nikola Jankovic committed
12
from .utils import cql_filter
13
14
15

logger = logging.getLogger(__name__)

Nikola Jankovic's avatar
Nikola Jankovic committed
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

def stringify(
    result: List[dict], mode: str = "item", extract_property: Optional[str] = None
):
    encoded: List[str] = []
    if mode == "item":
        encoded.extend((json.dumps(item, default=str) for item in result))
    elif mode == "property":
        encoded.extend((item["properties"][extract_property] for item in result))

    return encoded


def init_resource(harvest_config: dict) -> Resource:
    config: dict = harvest_config.pop("resource")

    if endpoint := get_endpoint(config):
        return endpoint

    if source := get_source(config):
        return source

    raise HarvestError(f"Resource type {config['type']} not found")


def main(config: dict, value: str, client: Redis):
    logger.info("running harvesting for %s", value)

    # Initialize resource
45
    try:
Nikola Jankovic's avatar
Nikola Jankovic committed
46
47
48
49
50
51
        harvest_config = next(
            item for item in config["harvesters"] if item["name"] == value
        )
    except StopIteration:
        harvest_config = json.loads(value)
    resource = init_resource(harvest_config)
52
53

    # Perform harvest
Nikola Jankovic's avatar
Nikola Jankovic committed
54
    result = resource.harvest()
55

Nikola Jankovic's avatar
Nikola Jankovic committed
56
57
58
59
60
61
62
63
64
    # Filter data
    result = cql_filter(harvest_config["filter"], result)

    # Stringify items
    encoded = stringify(
        result,
        harvest_config.get("mode", "item"),
        harvest_config.get("extract_property"),
    )
65

Nikola Jankovic's avatar
Nikola Jankovic committed
66
67
    # Send to queue
    client.lpush(harvest_config["queue"], *encoded)