EOX GitLab Instance

Commit 9164eb7f authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

improvements

updated fileharvester with some additional config
added structured logging
updated running daemon
parent f9ad2fb7
Pipeline #22621 failed with stage
in 39 seconds
......@@ -24,6 +24,7 @@ class Stat:
"""
Rudimentary file metadata
"""
mtime: Optional[datetime]
size: Optional[int]
......@@ -57,12 +58,25 @@ class Source(ABC):
"""
...
@abstractmethod
def get_link(self, path: str) -> str:
"""
Returns a proper link for the given file as http path
Args:
path (str): key fetched from botocore
Returns:
str: An https://amazonaws.com link
"""
class Endpoint(Resource):
"""
Endpoints are resources that use a search protocol (or something similar)
to harvest items. Thus, they are always associated with a specific URL.
"""
def __init__(self, url: str):
self.url = url
......@@ -71,6 +85,7 @@ class FileScheme(Resource):
"""
FileSchemes are resources that operate on a file basis on a given file source.
"""
def __init__(self, source: Source):
self.source = source
......@@ -79,6 +94,7 @@ class Postprocessor(ABC):
"""
Postprocessors can alter all harvested items.
"""
def __init__(self, **kwargs):
...
......
"""
daemon.py
==========
Contains functionality related to running the daemon
"""
import json
from structlog import get_logger
from redis import Redis
from .harvester import main
LOGGER = get_logger(__name__)
def init_client(host: str, port: int) -> Redis:
return Redis(host=host, port=port, charset="utf-8", decode_responses=True)
TIMEOUT = 5
class App:
def __init__(
self,
client: Redis,
config: dict,
listen_queue: str,
) -> None:
self.client = client
self.config = config
self.listen_queue = listen_queue
self.shutdown = False
def exit_gracefully(self, signum, frame):
LOGGER.info("Received shut down signal...", signum=signum)
self.shutdown = True
def run(self):
"""Run the harvester daemon, listening on a redis queue
for harvest jobs.
"""
# initialize the queue client
LOGGER.debug("listening queue for messages", listen_queue=self.listen_queue)
try:
_, value = self.client.brpop(self.listen_queue, timeout=TIMEOUT)
except TypeError:
return
item = json.loads(value)
# start the harvesting
try:
for queue, result in main(self.config, item):
self.client.lpush(queue, result)
except Exception as e:
LOGGER.exception(e)
......@@ -7,10 +7,13 @@ Contains command line interface
import logging.config
import json
import sys
import click
import structlog
import structlog.contextvars
from .daemon import run_daemon, init_client
from .app import App, init_client
from .config import load_config, validate_config
from .harvester import main
......@@ -20,13 +23,20 @@ def setup_logging(debug=False):
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {"brief": {"format": "%(levelname)s %(name)s: %(message)s"}},
"formatters": {
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.dev.ConsoleRenderer()
if debug
else structlog.processors.JSONRenderer(),
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG" if debug else "INFO",
"formatter": "brief",
}
"formatter": "json",
},
},
"root": {
"handlers": ["console"],
......@@ -35,10 +45,30 @@ def setup_logging(debug=False):
}
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=structlog.threadlocal.wrap_dict(dict),
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
@click.group()
def cli():
pass
@click.option("--debug/--no-debug", default=False)
def cli(debug: bool = False):
setup_logging(debug)
@cli.command(help="Run the harvester daemon, attaching to a Redis queue")
......@@ -47,51 +77,47 @@ def cli():
@click.option("--listen-queue", type=str, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
@click.option("--validate/--no-validate", default=False)
@click.option("--debug/--no-debug", default=False)
def daemon(
host: str,
port: int,
listen_queue: str,
config_file: str,
validate: bool = False,
debug: bool = False,
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
client = init_client(host, port)
run_daemon(config, client, listen_queue)
app = App(client, config, listen_queue)
while not app.shutdown:
app.run()
sys.exit(0)
@cli.command(help="Run a single, one-off harvest")
@click.argument("harvester_name", type=str)
@click.option("--values", type=str, default=None)
@click.option("--host", type=str, required=True)
@click.option("--port", type=int, required=True)
@click.option("--config-file", type=click.File("r"), required=True)
@click.option("--validate/--no-validate", default=False)
@click.option("--debug/--no-debug", default=False)
def harvest(
harvester_name: str,
values: str,
host: str,
port: int,
config_file: str,
validate: bool = False,
debug: bool = False,
):
setup_logging(debug)
config = load_config(config_file)
if validate:
validate_config(config)
client = init_client(host, port)
item = {"name": harvester_name}
if values:
item["values"] = json.loads(values)
main(config, item, client)
for _, i in main(config, item):
click.echo(i)
if __name__ == "__main__":
......
"""
daemon.py
==========
Contains functionality related to running the daemon
"""
import logging
import json
from redis import Redis
from .harvester import main
logger = logging.getLogger(__name__)
def init_client(host: str, port: int) -> Redis:
redis = Redis(host=host, port=port, charset="utf-8", decode_responses=True)
return redis
def run_daemon(config: dict, client: Redis, listen_queue: str):
"""Run the harvester daemon, listening on a redis queue
for harvest jobs.
"""
# initialize the queue client
logger.debug("waiting for redis queue '%s'", listen_queue)
while True:
# fetch an item from the queue to be harvested
_, raw_item = client.brpop(listen_queue)
# parse the item
item = json.loads(raw_item)
# start the harvesting
main(config, item, client)
from abc import abstractmethod, ABC
import logging
from typing import Any, Dict, Iterator, List
from dataclasses import dataclass, field
import requests
import lxml.etree as ET
import pystac
from structlog import get_logger
from ..abc import Endpoint
from .query import Query
from ..exceptions import QueryError
logger = logging.getLogger(__name__)
LOGGER = get_logger(__name__)
@dataclass(frozen=True)
......@@ -128,7 +128,7 @@ class OpenSearchEndpoint(Endpoint):
self.query = OpenSearchQuery(**query)
def harvest(self) -> Iterator[dict]:
logger.info("Starting OpenSearch harvesting")
LOGGER.info("Starting OpenSearch harvesting")
parser = ET.XMLParser(recover=True)
data = ET.fromstring(requests.get(self.url).content, parser)
......
import logging
from typing import Iterator
import requests
from structlog import get_logger
from ..abc import Endpoint
from .query import Query
logger = logging.getLogger(__name__)
LOGGER = get_logger(__name__)
class STACAPIEndpoint(Endpoint):
......@@ -17,21 +17,21 @@ class STACAPIEndpoint(Endpoint):
self.query = StacAPIQuery(**query)
def harvest(self) -> Iterator[dict]:
logger.info("Starting STACAPI harvesting")
LOGGER.info("Starting STACAPI harvesting")
main = requests.get(self.url).json()
search_url = next(
link["href"] for link in main["links"] if link["rel"] == "search"
)
search_params = self.query.prepare_params(root_response=main)
logger.debug("querying %s with %s", search_url, search_params)
LOGGER.debug("querying", search_url=search_url, params=search_params)
query_data = requests.get(search_url, params=search_params).json()
context = query_data["context"]
while context["returned"]:
for feature in query_data["features"]:
yield feature
search_params["page"] += 1
logger.debug("querying %s with %s", search_url, search_params)
LOGGER.debug("querying", search_url=search_url, params=search_params)
query_data = requests.get(search_url, params=search_params).json()
context = query_data["context"]
......
import logging
import re
from typing import Iterator
from dateutil.parser import isoparse
import pystac
from dateutil.parser import isoparse
from structlog import get_logger
from ..abc import FileScheme, Source
logger = logging.getLogger(__name__)
LOGGER = get_logger(__name__)
class FileMatcherScheme(FileScheme):
def __init__(self, source: Source, path: str, time_regex: str, url: str):
def __init__(
self, source: Source, path: str, time_regex: str, id_regex: str, asset_id: str
):
super().__init__(source)
self.path = path
self.time_regex = re.compile(time_regex)
self.url = url
self.id_regex = re.compile(id_regex)
self.asset_id = asset_id
def harvest(self) -> Iterator[dict]:
logger.info("Starting S3 harvesting")
LOGGER.info("Starting S3 harvesting")
for filename in self.source.listdir(self.path):
if match := re.search(self.time_regex, filename):
dt = isoparse(match[0])
item = self._create_item(filename, dt)
if (dt_match := re.search(self.time_regex, filename)) and (
id_match := re.search(self.id_regex, filename)
):
id = id_match[0]
dt = isoparse(dt_match[0])
item = self._create_item(id, filename, dt)
yield item.to_dict()
def _create_item(self, path, dt):
identifier = dt.strftime("%Y%m%d_%H%M%S")
def _create_item(self, id, path, dt):
properties = {
"datetime": dt,
"updated": self.source.stat(path).mtime,
}
item = pystac.Item(
id=identifier, geometry=None, bbox=None, datetime=dt, properties=properties
id=id, geometry=None, bbox=None, datetime=dt, properties=properties
)
item.add_asset(identifier, pystac.Asset(f"{self.url}{path}"))
item.add_asset(self.asset_id, pystac.Asset(self.source.get_link(path)))
return item
import logging
import json
from typing import Iterator
from urllib.parse import urljoin
from structlog import get_logger
from ..abc import FileScheme, Source
logger = logging.getLogger(__name__)
LOGGER = get_logger(__name__)
class STACCatalogScheme(FileScheme):
......@@ -25,12 +26,12 @@ class STACCatalogScheme(FileScheme):
Harvests a specified STAC catalog. Will recurse into child catalogs
and yield all included STAC items.
"""
logger.info(f"Harvesting from catalog {path}")
LOGGER.info("Harvesting from catalog", path=path)
catalog = self._read_json(path)
for link in catalog["links"]:
if link["rel"] == "item":
item_href = urljoin(path, link["href"])
logger.info(f"Harvested item {item_href}")
LOGGER.info("Harvested item", href=item_href)
yield self._read_json(item_href)
elif link["rel"] == "child":
catalog_href = urljoin(path, link["href"])
......
import json
import logging
from typing import Union
from typing import Tuple, Union, Generator
from redis import Redis
from structlog import get_logger
from mergedeep import merge
from .abc import Resource
......@@ -11,7 +10,9 @@ from .filescheme import get_filescheme
from .exceptions import HarvestError
from .utils import cql_filter
from .postprocess import get_postprocessor
logger = logging.getLogger(__name__)
LOGGER = get_logger(__name__)
def init_resource(harvest_config: dict) -> Resource:
......@@ -43,7 +44,9 @@ def get_harvest_config(config: dict, name: str, values: dict) -> dict:
raise KeyError(name)
def main(config: dict, item: Union[dict, str], client: Redis):
def main(
config: dict, item: Union[dict, str]
) -> Generator[Tuple[str, str], None, None]:
if isinstance(item, str):
name = item
values = {}
......@@ -51,7 +54,7 @@ def main(config: dict, item: Union[dict, str], client: Redis):
name = item["name"]
values = item.get("values", {})
logger.info(f"Running harvesting for {name}")
LOGGER.info("Running harvesting...", name=name)
# get the harvest config
harvest_config = get_harvest_config(config, name, values)
......@@ -64,15 +67,11 @@ def main(config: dict, item: Union[dict, str], client: Redis):
if "postprocess" in harvest_config:
postprocessor = get_postprocessor(harvest_config["postprocess"])
result = (
postprocessor.postprocess(item)
for item in result
)
result = (postprocessor.postprocess(item) for item in result)
# Filter data
if "filter" in harvest_config:
result = cql_filter(harvest_config["filter"], result)
# Send to queue
for item in result:
client.lpush(harvest_config["queue"], json.dumps(item, default=str))
yield harvest_config["queue"], json.dumps(item, default=str)
import logging
from typing import IO, List
from urllib.parse import urljoin
from io import StringIO
import requests
from structlog import get_logger
from dateutil.parser import parse
from ..abc import Source, Stat
logger = logging.getLogger(__name__)
LOGGER = get_logger(__name__)
class HTTPSource(Source):
......@@ -25,17 +25,17 @@ class HTTPSource(Source):
def open(self, path: str) -> IO:
url = urljoin(self.root_url, path)
logger.debug(f"Opening URL: {url}")
LOGGER.debug("Opening URL", url=url)
response = self.session.get(url)
return StringIO(response.text)
def stat(self, path: str) -> Stat:
url = urljoin(self.root_url, path)
logger.debug(f"Opening URL: {url}")
LOGGER.debug("Opening URL", url=url)
response = self.session.head(url)
size = response.headers.get('Content-Length')
last_modified = response.headers.get('Last-Modified')
size = response.headers.get("Content-Length")
last_modified = response.headers.get("Last-Modified")
return Stat(
parse(last_modified) if last_modified is not None else None,
int(size) if size is not None else None
int(size) if size is not None else None,
)
import logging
from typing import TYPE_CHECKING, IO, AnyStr, List, Tuple
from functools import cached_property
from urllib.parse import urlparse
......@@ -13,10 +11,11 @@ import botocore.session
import botocore.handlers
from botocore import UNSIGNED
from botocore.config import Config
from structlog import get_logger
from ..abc import Source, Stat
logger = logging.getLogger(__name__)
LOGGER = get_logger(__name__)
class S3Source(Source):
......@@ -82,10 +81,14 @@ class S3Source(Source):
def open(self, path: str) -> IO[AnyStr]:
bucket, key = self.get_bucket_and_key(path)
logger.debug("bucket, key: %s, %s", bucket, key)
LOGGER.debug("opening", bucket=bucket, key=key)
return self.client.get_object(Bucket=bucket, Key=key)["Body"]
def stat(self, path: str) -> Stat:
bucket, key = self.get_bucket_and_key(path)
response = self.client.head_object(Bucket=bucket, Key=key)
return Stat(mtime=response["LastModified"], size=response["ContentLength"])
def get_link(self, path: str) -> str:
bucket, key = self.get_bucket_and_key(path)
return f"https://{bucket}.s3.{self.region_name}.amazonaws.com/{key}"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment