EOX GitLab Instance

Skip to content
Snippets Groups Projects
seeder.py 9 KiB
Newer Older
Stephan's avatar
Stephan committed
#!/usr/bin/env python
#-----------------------------------------------------------------------------
#
# Project: seeder.py
# Authors: Stephan Meissl <stephan.meissl@eox.at>
#
#-----------------------------------------------------------------------------
# Copyright (c) 2018 EOX IT Services GmbH
#
Mussab Abdalla's avatar
Mussab Abdalla committed
# Python script to pre-seed PVS cache.
Stephan's avatar
Stephan committed
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#-----------------------------------------------------------------------------


import os
import sys
import argparse
import textwrap
import logging
import traceback
import signal
import dateutil.parser
import redis
import subprocess

import sqlite3
import boto3


# collection:
COLLECTION_MAP = {
    # TODO
}


logger = logging.getLogger(__name__)


def setup_logging(verbosity):
    # start logging setup
    # get command line level
    verbosity = verbosity
    if verbosity == 0:
        level = logging.CRITICAL
    elif verbosity == 1:
        level = logging.ERROR
    elif verbosity == 2:
        level = logging.WARNING
    elif verbosity == 3:
        level = logging.INFO
    else:
        level = logging.DEBUG
    logger.setLevel(level)
    sh = logging.StreamHandler()
    sh.setLevel(level)
    formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
    sh.setFormatter(formatter)
    logger.addHandler(sh)
    # finished logging setup


CRS_BOUNDS = {
    3857: (-20037508.3428, -20037508.3428, 20037508.3428, 20037508.3428),
    4326: (-180, -90, 180, 90)
}

GRID_TO_SRID = {
    "GoogleMapsCompatible": 3857,
    "WGS84": 4326
}


def seed_mapcache(seed_command, config_file, tileset, grid,
                  minx, miny, maxx, maxy, minzoom, maxzoom,
                  start_time, end_time, threads, delete, metatile,
                  timeout=1800, force=True):

    bounds = CRS_BOUNDS[GRID_TO_SRID[grid]]
    full = float(abs(bounds[0]) + abs(bounds[2]))

    dateline_crossed = False
    if maxx > bounds[2]:
        dateline_crossed = True
    # extent is always within [bounds[0],bounds[2]]
    # where maxx can be >bounds[2] but <=full
    if minx < bounds[0] or minx > bounds[2] or maxx < bounds[0] or maxx > full:
        raise Exception("Invalid extent '%s,%s,%s,%s'."
                        % (minx, miny, maxx, maxy))

    if minzoom is None:
        minzoom = 0
    if maxzoom is None:
        maxzoom = 10

    # start- and end-time are expected to be UTC Zulu
    start_time = start_time.replace(tzinfo=None)
    end_time = end_time.replace(tzinfo=None)

    logger.info("Starting mapcache seed with parameters: command='%s', "
                "config_file='%s', tileset='%s', grid='%s', "
                "extent='%f,%f,%f,%f', zoom='%d,%d', nthreads='%s', "
                "mode='%s', dimension='TIME=%sZ/%sZ', metatile=%d,%d, "
                "force=%s."
                % (seed_command, config_file, tileset, grid,
                   minx, miny, maxx, maxy, minzoom, maxzoom, threads,
                   "seed" if not delete else "delete",
                   start_time.isoformat(), end_time.isoformat(),
                   metatile, metatile, force))

    seed_args = [
        seed_command,
        "-c", config_file,
        "-t", tileset,
        "-g", grid,
        "-e", "%f,%f,%f,%f"
        % (minx, miny, bounds[2] if dateline_crossed else maxx, maxy),
        "-n", str(threads),
        "-z", "%d,%d" % (minzoom, maxzoom),
        "-D", "TIME=%sZ/%sZ" % (start_time.isoformat(), end_time.isoformat()),
        "-m", "seed" if not delete else "delete",
        "-q",
        "-M", "%d,%d" % (metatile, metatile),
        "-L", "/cache-db/%s/failed_TIME_%sZ_%sZ_extent_%f,%f,%f,%f_zoom-%d-%d"
        % (
            collection,
            start_time.strftime("%Y%m%dT%H%M%S"),
            end_time.strftime("%Y%m%dT%H%M%S"),
            minx, miny, maxx, maxy, minzoom, maxzoom
        ),
        "-P", "10",
    ]

    if not delete and force:
        seed_args.append("-f")

    logger.debug("MapCache seeding command: '%s'. raw: '%s'."
                 % (" ".join(seed_args), seed_args))

    process = subprocess.Popen(
        seed_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
    )

    try:
        out = process.communicate(timeout=timeout)[0]
    except subprocess.TimeoutExpired:
        logger.error("Seeding timed out")
        process.kill()
        out = process.communicate()

    if isinstance(out, tuple):
        for string in out:
            if string is not None:
                for line in string.decode("utf-8").split("\n"):
                    if line != "":
                        logger.info(
                            "MapCache output: %s" % line
                        )
    else:
        out = out.decode("utf-8").strip()
        if out != "":
            logger.info(
                "MapCache output: %s" % out
            )

    if process.returncode != 0:
        raise Exception("'%s' failed. Returncode '%d'."
                        % (seed_command, process.returncode))

    logger.info("Seeding finished with returncode '%d'." % process.returncode)

    return process.returncode


def seeder(collection, start, end, leave_existing=False):
    logger.info("Starting seeding from '%s' to '%s'." % (start, end))

    if start > end:
        logger.critical("Start '%s' is after end '%s'." % (start, end))
        sys.exit(1)

    # TODO

    logger.info(
        "Finished seeding from '%s' to '%s'." % (start, end)
    )


def seeder_redis_wrapper(
        collection, leave_existing, host="localhost", port=6379,
        queue_key='ingest_queue'
):
    client = redis.Redis(host=host, port=port)
    while True:
        logger.debug("waiting for redis queue '%s'..." % queue_key)
        value = client.brpop(queue_key)
        start, end = value[1].split(b"/")
        seeder(
            collection,
            dateutil.parser.parse(start),
            dateutil.parser.parse(end)
        )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.description = textwrap.dedent("""\
Mussab Abdalla's avatar
Mussab Abdalla committed
    Pre-seeds cache of PRISM View Server (pvs).
Stephan's avatar
Stephan committed
    """)

    parser.add_argument(
        "collection", default=None,
        help=(
            "Collection the seeder is run for."
        )
    )
    parser.add_argument(
        "--mode", default="standard", choices=["standard", "redis"],
        help=(
            "The mode to run the seeder. Either one-off (standard) or "
            "reading from a redis queue."
        )
    )
    parser.add_argument(
        "--start", default=None, type=dateutil.parser.parse,
        help=(
            "Mandatory argument indicating start date and time for "
            "the seeding."
        )
    )
    parser.add_argument(
        "--end", default=None, type=dateutil.parser.parse,
        help=(
            "Mandatory argument indicating end date and time for the seeding."
        )
    )
    parser.add_argument(
        "--leave_existing", action="store_true",
        help=(
            "Don't delete existing images from cache."
        )
    )
    parser.add_argument(
        "--redis-queue-key", default="seed_queue"
    )
    parser.add_argument(
        "--redis-host", default="localhost"
    )
    parser.add_argument(
        "--redis-port", type=int, default=6379
    )

    parser.add_argument(
        "-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4],
        help=(
            "Set verbosity of log output "
            "(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)"
        )
    )

    arg_values = parser.parse_args()

    setup_logging(arg_values.verbosity)

    collection = arg_values.collection
    if collection not in COLLECTION_MAP:
        logger.critical("Provided collection '%s' is not valid." % collection)
        sys.exit(1)

    if arg_values.mode == "standard":
        seeder(
            collection,
            arg_values.start,
            arg_values.end,
            arg_values.leave_existing,
        )
    else:
        seeder_redis_wrapper(
            collection,
            arg_values.leave_existing,
            host=arg_values.redis_host,
            port=arg_values.redis_port,
            queue_key=arg_values.redis_queue_key,
        )