#!/usr/bin/env python #----------------------------------------------------------------------------- # # Project: seeder.py # Authors: Stephan Meissl <stephan.meissl@eox.at> # #----------------------------------------------------------------------------- # Copyright (c) 2018 EOX IT Services GmbH # # Python script to pre-seed PVS cache. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to # deal in the Software without restriction, including without limitation the # rights to use, copy, modify, merge, publish, distribute, sublicense, and/or # sell copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies of this Software or works derived from this Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. #----------------------------------------------------------------------------- import os import sys import argparse import textwrap import logging import traceback import signal import dateutil.parser import redis import subprocess import sqlite3 import boto3 # collection: COLLECTION_MAP = { # TODO } logger = logging.getLogger(__name__) def setup_logging(verbosity): # start logging setup # get command line level verbosity = verbosity if verbosity == 0: level = logging.CRITICAL elif verbosity == 1: level = logging.ERROR elif verbosity == 2: level = logging.WARNING elif verbosity == 3: level = logging.INFO else: level = logging.DEBUG logger.setLevel(level) sh = logging.StreamHandler() sh.setLevel(level) formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) # finished logging setup CRS_BOUNDS = { 3857: (-20037508.3428, -20037508.3428, 20037508.3428, 20037508.3428), 4326: (-180, -90, 180, 90) } GRID_TO_SRID = { "GoogleMapsCompatible": 3857, "WGS84": 4326 } def seed_mapcache(seed_command, config_file, tileset, grid, minx, miny, maxx, maxy, minzoom, maxzoom, start_time, end_time, threads, delete, metatile, timeout=1800, force=True): bounds = CRS_BOUNDS[GRID_TO_SRID[grid]] full = float(abs(bounds[0]) + abs(bounds[2])) dateline_crossed = False if maxx > bounds[2]: dateline_crossed = True # extent is always within [bounds[0],bounds[2]] # where maxx can be >bounds[2] but <=full if minx < bounds[0] or minx > bounds[2] or maxx < bounds[0] or maxx > full: raise Exception("Invalid extent '%s,%s,%s,%s'." % (minx, miny, maxx, maxy)) if minzoom is None: minzoom = 0 if maxzoom is None: maxzoom = 10 # start- and end-time are expected to be UTC Zulu start_time = start_time.replace(tzinfo=None) end_time = end_time.replace(tzinfo=None) logger.info("Starting mapcache seed with parameters: command='%s', " "config_file='%s', tileset='%s', grid='%s', " "extent='%f,%f,%f,%f', zoom='%d,%d', nthreads='%s', " "mode='%s', dimension='TIME=%sZ/%sZ', metatile=%d,%d, " "force=%s." % (seed_command, config_file, tileset, grid, minx, miny, maxx, maxy, minzoom, maxzoom, threads, "seed" if not delete else "delete", start_time.isoformat(), end_time.isoformat(), metatile, metatile, force)) seed_args = [ seed_command, "-c", config_file, "-t", tileset, "-g", grid, "-e", "%f,%f,%f,%f" % (minx, miny, bounds[2] if dateline_crossed else maxx, maxy), "-n", str(threads), "-z", "%d,%d" % (minzoom, maxzoom), "-D", "TIME=%sZ/%sZ" % (start_time.isoformat(), end_time.isoformat()), "-m", "seed" if not delete else "delete", "-q", "-M", "%d,%d" % (metatile, metatile), "-L", "/cache-db/%s/failed_TIME_%sZ_%sZ_extent_%f,%f,%f,%f_zoom-%d-%d" % ( collection, start_time.strftime("%Y%m%dT%H%M%S"), end_time.strftime("%Y%m%dT%H%M%S"), minx, miny, maxx, maxy, minzoom, maxzoom ), "-P", "10", ] if not delete and force: seed_args.append("-f") logger.debug("MapCache seeding command: '%s'. raw: '%s'." % (" ".join(seed_args), seed_args)) process = subprocess.Popen( seed_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) try: out = process.communicate(timeout=timeout)[0] except subprocess.TimeoutExpired: logger.error("Seeding timed out") process.kill() out = process.communicate() if isinstance(out, tuple): for string in out: if string is not None: for line in string.decode("utf-8").split("\n"): if line != "": logger.info( "MapCache output: %s" % line ) else: out = out.decode("utf-8").strip() if out != "": logger.info( "MapCache output: %s" % out ) if process.returncode != 0: raise Exception("'%s' failed. Returncode '%d'." % (seed_command, process.returncode)) logger.info("Seeding finished with returncode '%d'." % process.returncode) return process.returncode def seeder(collection, start, end, leave_existing=False): logger.info("Starting seeding from '%s' to '%s'." % (start, end)) if start > end: logger.critical("Start '%s' is after end '%s'." % (start, end)) sys.exit(1) # TODO logger.info( "Finished seeding from '%s' to '%s'." % (start, end) ) def seeder_redis_wrapper( collection, leave_existing, host="localhost", port=6379, queue_key='ingest_queue' ): client = redis.Redis(host=host, port=port) while True: logger.debug("waiting for redis queue '%s'..." % queue_key) value = client.brpop(queue_key) start, end = value[1].split(b"/") seeder( collection, dateutil.parser.parse(start), dateutil.parser.parse(end) ) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.description = textwrap.dedent("""\ Pre-seeds cache of PRISM View Server (pvs). """) parser.add_argument( "collection", default=None, help=( "Collection the seeder is run for." ) ) parser.add_argument( "--mode", default="standard", choices=["standard", "redis"], help=( "The mode to run the seeder. Either one-off (standard) or " "reading from a redis queue." ) ) parser.add_argument( "--start", default=None, type=dateutil.parser.parse, help=( "Mandatory argument indicating start date and time for " "the seeding." ) ) parser.add_argument( "--end", default=None, type=dateutil.parser.parse, help=( "Mandatory argument indicating end date and time for the seeding." ) ) parser.add_argument( "--leave_existing", action="store_true", help=( "Don't delete existing images from cache." ) ) parser.add_argument( "--redis-queue-key", default="seed_queue" ) parser.add_argument( "--redis-host", default="localhost" ) parser.add_argument( "--redis-port", type=int, default=6379 ) parser.add_argument( "-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4], help=( "Set verbosity of log output " "(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)" ) ) arg_values = parser.parse_args() setup_logging(arg_values.verbosity) collection = arg_values.collection if collection not in COLLECTION_MAP: logger.critical("Provided collection '%s' is not valid." % collection) sys.exit(1) if arg_values.mode == "standard": seeder( collection, arg_values.start, arg_values.end, arg_values.leave_existing, ) else: seeder_redis_wrapper( collection, arg_values.leave_existing, host=arg_values.redis_host, port=arg_values.redis_port, queue_key=arg_values.redis_queue_key, )