EOX GitLab Instance

harvester.py 2.26 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
import logging
from typing import Type
from .endpoint import Endpoint
from redis import Redis

from .exceptions import HarvestError

logger = logging.getLogger(__name__)

class Harvester:
    def __init__(self, name: str, endpoint: dict, queue: str,
                 mode: str = 'item', extract_property: str = None):
        self.name = name
        self.endpoint: Type[Endpoint] = Endpoint.from_config(endpoint)
        self.queue = queue
        self.mode = mode
        self.extract_property = extract_property

    @classmethod
    def from_config(cls, config: dict, value: str) -> 'Harvester':
        logger.debug('Initializing Harvester from config with %s and %s', config, value)
        try:
            harvester_cfg = next(item for item in config['harvesters'] if item["name"] == value)
        except StopIteration:
            raise HarvestError(f'Name {value} not found in config')
        return cls(**harvester_cfg)

    @classmethod
    def from_json(cls, value:str) -> 'Harvester':
        logger.debug('Initializing Harvester from json with %s', value)
        params = json.loads(value)
        return cls(**params)

def main(config: dict, value: str, client: Redis=None, **kwargs):
    logger.info('running harvesting for %s', value)

    # Initialize harvester
    try:
        harvester = Harvester.from_json(value)
    except json.decoder.JSONDecodeError:
        logger.debug('Failed to init from json, trying from config')
        try:
            harvester = Harvester.from_config(config, value)
        except HarvestError:
            logger.debug('Failed to init from config, trying from params')
            harvester = Harvester(**kwargs)

    # Perform harvest
    result = harvester.endpoint.harvest()

    encoded = None
    if harvester.mode == 'item':
        # Stringify items
        encoded = [json.dumps(item) for item in result]
    elif harvester.mode == 'property':
        encoded = [
            item['properties'][harvester.extract_property]
            for item in result
        ]

    # Send to queue
    if not client:
        redis_cfg = config['redis']
        host = redis_cfg['host']
        port = redis_cfg['port']
        client = Redis(host=host, port=port, charset="utf-8", decode_responses=True)

    client.lpush(harvester.queue, *encoded)