import json import logging from typing import Iterator, Optional from redis import Redis from harvester.resource import Resource from .endpoint import get_endpoint from .source import get_source from .exceptions import HarvestError from .utils import cql_filter from .postprocess import get_postprocessor logger = logging.getLogger(__name__) def stringify( result: Iterator[dict], mode: str = "item", extract_property: Optional[str] = None, ) -> Iterator[str]: if mode == "item": yield from (json.dumps(item, default=str) for item in result) elif mode == "property": yield from (item["properties"][extract_property] for item in result) def init_resource(harvest_config: dict) -> Resource: config: dict = harvest_config.pop("resource") if endpoint := get_endpoint(config): return endpoint if source := get_source(config): return source raise HarvestError(f"Resource type {config['type']} not found") def main(config: dict, value: str, client: Redis): logger.info("running harvesting for %s", value) # Initialize resource try: harvest_config = next( item for item in config["harvesters"] if item["name"] == value ) except StopIteration: harvest_config = json.loads(value) resource = init_resource(harvest_config) # Perform harvest result = resource.harvest() if "postprocess" in harvest_config: postprocessor = get_postprocessor(harvest_config["postprocess"]) result = ( postprocessor.postprocess(item) for item in result ) # Filter data result = cql_filter(harvest_config["filter"], result) # Stringify items encoded = stringify( result, harvest_config.get("mode", "item"), harvest_config.get("extract_property"), ) # Send to queue for value in encoded: client.lpush(harvest_config["queue"], value)