EOX GitLab Instance

Commit 02f66ab2 authored by Nikola Jankovic's avatar Nikola Jankovic 💻
Browse files

added basics for seeder

parent 1f2f6db4
Pipeline #19062 passed with stage
in 3 minutes and 15 seconds
# Temporary and binary files
*~
*.py[cod]
*.so
!.isort.cfg
!setup.cfg
*.orig
*.log
*.pot
__pycache__/*
.cache/*
.*.swp
*/.ipynb_checkpoints/*
.DS_Store
.pylintrc
# Project files
.ropeproject
.project
.pydevproject
.settings
.idea
.vscode
.vscode/ .vscode/
tags
# Package files
*.egg
*.eggs/
.installed.cfg
*.egg-info
# Unittest and coverage
htmlcov/*
.coverage
.coverage.*
.tox
junit*.xml
coverage.xml
.pytest_cache/
# Build and docs folder/files
build/*
dist/*
sdist/*
docs/api/*
docs/_rst/*
docs/_build/*
cover/*
MANIFEST
# Per-project virtualenvs
.venv*/
.conda*/
-i https://__token__:${CI_JOB_TOKEN}@gitlab.eox.at/api/v4/projects/324/packages/pypi/simple -i https://__token__:${CI_JOB_TOKEN}@gitlab.eox.at/api/v4/projects/324/packages/pypi/simple
vsq vsq
\ No newline at end of file redis<4.0
boto3<2.0
#!/usr/bin/env python #!/usr/bin/env python
#----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# #
# Project: seeder.py # Project: seeder.py
# Authors: Stephan Meissl <stephan.meissl@eox.at> # Authors: Stephan Meissl <stephan.meissl@eox.at>
# #
#----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Copyright (c) 2018 EOX IT Services GmbH # Copyright (c) 2018 EOX IT Services GmbH
# #
# Python script to pre-seed VS cache. # Python script to pre-seed VS cache.
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE. # IN THE SOFTWARE.
#----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import os import os
...@@ -34,13 +34,10 @@ import sys ...@@ -34,13 +34,10 @@ import sys
import argparse import argparse
import textwrap import textwrap
import logging import logging
import traceback
import signal
import dateutil.parser import dateutil.parser
import redis
import subprocess import subprocess
import sqlite3 import redis
import boto3 import boto3
...@@ -64,7 +61,7 @@ def setup_logging(verbosity): ...@@ -64,7 +61,7 @@ def setup_logging(verbosity):
logger.setLevel(level) logger.setLevel(level)
sh = logging.StreamHandler() sh = logging.StreamHandler()
sh.setLevel(level) sh.setLevel(level)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s') formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
sh.setFormatter(formatter) sh.setFormatter(formatter)
logger.addHandler(sh) logger.addHandler(sh)
# finished logging setup # finished logging setup
...@@ -72,19 +69,31 @@ def setup_logging(verbosity): ...@@ -72,19 +69,31 @@ def setup_logging(verbosity):
CRS_BOUNDS = { CRS_BOUNDS = {
3857: (-20037508.3428, -20037508.3428, 20037508.3428, 20037508.3428), 3857: (-20037508.3428, -20037508.3428, 20037508.3428, 20037508.3428),
4326: (-180, -90, 180, 90) 4326: (-180, -90, 180, 90),
} }
GRID_TO_SRID = { GRID_TO_SRID = {"GoogleMapsCompatible": 3857, "WGS84": 4326}
"GoogleMapsCompatible": 3857,
"WGS84": 4326
} def seed_mapcache(
seed_command,
config_file,
def seed_mapcache(seed_command, config_file, tileset, grid, tileset,
minx, miny, maxx, maxy, minzoom, maxzoom, grid,
start_time, end_time, threads, delete, metatile, minx,
timeout=1800, force=True): miny,
maxx,
maxy,
minzoom,
maxzoom,
start_time,
end_time,
threads,
delete,
metatile,
timeout=1800,
force=True,
):
bounds = CRS_BOUNDS[GRID_TO_SRID[grid]] bounds = CRS_BOUNDS[GRID_TO_SRID[grid]]
full = float(abs(bounds[0]) + abs(bounds[2])) full = float(abs(bounds[0]) + abs(bounds[2]))
...@@ -95,8 +104,7 @@ def seed_mapcache(seed_command, config_file, tileset, grid, ...@@ -95,8 +104,7 @@ def seed_mapcache(seed_command, config_file, tileset, grid,
# extent is always within [bounds[0],bounds[2]] # extent is always within [bounds[0],bounds[2]]
# where maxx can be >bounds[2] but <=full # where maxx can be >bounds[2] but <=full
if minx < bounds[0] or minx > bounds[2] or maxx < bounds[0] or maxx > full: if minx < bounds[0] or minx > bounds[2] or maxx < bounds[0] or maxx > full:
raise Exception("Invalid extent '%s,%s,%s,%s'." raise Exception("Invalid extent '%s,%s,%s,%s'." % (minx, miny, maxx, maxy))
% (minx, miny, maxx, maxy))
if minzoom is None: if minzoom is None:
minzoom = 0 minzoom = 0
...@@ -107,48 +115,82 @@ def seed_mapcache(seed_command, config_file, tileset, grid, ...@@ -107,48 +115,82 @@ def seed_mapcache(seed_command, config_file, tileset, grid,
start_time = start_time.replace(tzinfo=None) start_time = start_time.replace(tzinfo=None)
end_time = end_time.replace(tzinfo=None) end_time = end_time.replace(tzinfo=None)
logger.info("Starting mapcache seed with parameters: command='%s', " logger.info(
"config_file='%s', tileset='%s', grid='%s', " "Starting mapcache seed with parameters: command='%s', "
"extent='%f,%f,%f,%f', zoom='%d,%d', nthreads='%s', " "config_file='%s', tileset='%s', grid='%s', "
"mode='%s', dimension='TIME=%sZ/%sZ', metatile=%d,%d, " "extent='%f,%f,%f,%f', zoom='%d,%d', nthreads='%s', "
"force=%s." "mode='%s', dimension='TIME=%sZ/%sZ', metatile=%d,%d, "
% (seed_command, config_file, tileset, grid, "force=%s."
minx, miny, maxx, maxy, minzoom, maxzoom, threads, % (
"seed" if not delete else "delete", seed_command,
start_time.isoformat(), end_time.isoformat(), config_file,
metatile, metatile, force)) tileset,
grid,
minx,
miny,
maxx,
maxy,
minzoom,
maxzoom,
threads,
"seed" if not delete else "delete",
start_time.isoformat(),
end_time.isoformat(),
metatile,
metatile,
force,
)
)
seed_args = [ seed_args = [
seed_command, seed_command,
"-c", config_file, "-c",
"-t", tileset, config_file,
"-g", grid, "-t",
"-e", "%f,%f,%f,%f" tileset,
% (minx, miny, bounds[2] if dateline_crossed else maxx, maxy), "-g",
"-n", str(threads), grid,
"-z", "%d,%d" % (minzoom, maxzoom), "-e",
"-D", "TIME=%sZ/%sZ" % (start_time.isoformat(), end_time.isoformat()), "%f,%f,%f,%f" % (minx, miny, bounds[2] if dateline_crossed else maxx, maxy),
"-m", "seed" if not delete else "delete", "-n",
str(threads),
"-z",
"%d,%d" % (minzoom, maxzoom),
"-D",
"TIME=%sZ/%sZ" % (start_time.isoformat(), end_time.isoformat()),
"-m",
"seed" if not delete else "delete",
"-q", "-q",
"-M", "%d,%d" % (metatile, metatile), "-M",
"-L", "/cache-db/%s/failed_TIME_%sZ_%sZ_extent_%f,%f,%f,%f_zoom-%d-%d" "%d,%d" % (metatile, metatile),
"-L",
"/cache-db/%s/failed_TIME_%sZ_%sZ_extent_%f,%f,%f,%f_zoom-%d-%d"
% ( % (
collection, collection,
start_time.strftime("%Y%m%dT%H%M%S"), start_time.strftime("%Y%m%dT%H%M%S"),
end_time.strftime("%Y%m%dT%H%M%S"), end_time.strftime("%Y%m%dT%H%M%S"),
minx, miny, maxx, maxy, minzoom, maxzoom minx,
miny,
maxx,
maxy,
minzoom,
maxzoom,
), ),
"-P", "10", "-P",
"10",
] ]
if not delete and force: if not delete and force:
seed_args.append("-f") seed_args.append("-f")
logger.debug("MapCache seeding command: '%s'. raw: '%s'." logger.debug(
% (" ".join(seed_args), seed_args)) "MapCache seeding command: '%s'. raw: '%s'." % (" ".join(seed_args), seed_args)
)
process = subprocess.Popen( process = subprocess.Popen(
seed_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, seed_args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) )
try: try:
...@@ -163,19 +205,16 @@ def seed_mapcache(seed_command, config_file, tileset, grid, ...@@ -163,19 +205,16 @@ def seed_mapcache(seed_command, config_file, tileset, grid,
if string is not None: if string is not None:
for line in string.decode("utf-8").split("\n"): for line in string.decode("utf-8").split("\n"):
if line != "": if line != "":
logger.info( logger.info("MapCache output: %s" % line)
"MapCache output: %s" % line
)
else: else:
out = out.decode("utf-8").strip() out = out.decode("utf-8").strip()
if out != "": if out != "":
logger.info( logger.info("MapCache output: %s" % out)
"MapCache output: %s" % out
)
if process.returncode != 0: if process.returncode != 0:
raise Exception("'%s' failed. Returncode '%d'." raise Exception(
% (seed_command, process.returncode)) "'%s' failed. Returncode '%d'." % (seed_command, process.returncode)
)
logger.info("Seeding finished with returncode '%d'." % process.returncode) logger.info("Seeding finished with returncode '%d'." % process.returncode)
...@@ -191,82 +230,75 @@ def seeder(collection, start, end, leave_existing=False): ...@@ -191,82 +230,75 @@ def seeder(collection, start, end, leave_existing=False):
# TODO # TODO
logger.info( logger.info("Finished seeding from '%s' to '%s'." % (start, end))
"Finished seeding from '%s' to '%s'." % (start, end)
)
def seeder_redis_wrapper( def seeder_redis_wrapper(
collection, leave_existing, host="localhost", port=6379, collection, leave_existing, host="localhost", port=6379, queue_key="ingest_queue"
queue_key='ingest_queue'
): ):
client = redis.Redis(host=host, port=port) client = redis.Redis(host=host, port=port)
while True: while True:
logger.debug("waiting for redis queue '%s'..." % queue_key) logger.debug("waiting for redis queue '%s'..." % queue_key)
value = client.brpop(queue_key) value = client.brpop(queue_key)
start, end = value[1].split(b"/") start, end = value[1].split(b"/")
seeder( seeder(collection, dateutil.parser.parse(start), dateutil.parser.parse(end))
collection,
dateutil.parser.parse(start),
dateutil.parser.parse(end)
)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.description = textwrap.dedent("""\ parser.description = textwrap.dedent(
"""\
Pre-seeds cache of View Server (vs). Pre-seeds cache of View Server (vs).
""") """
)
parser.add_argument( parser.add_argument(
"--mode", default="standard", choices=["standard", "redis"], "--mode",
default="standard",
choices=["standard", "redis"],
help=( help=(
"The mode to run the seeder. Either one-off (standard) or " "The mode to run the seeder. Either one-off (standard) or "
"reading from a redis queue." "reading from a redis queue."
) ),
)
parser.add_argument(
"--start", default=None, type=dateutil.parser.parse,
help=(
"Mandatory argument indicating start date and time for "
"the seeding."
)
)
parser.add_argument(
"--end", default=None, type=dateutil.parser.parse,
help=(
"Mandatory argument indicating end date and time for the seeding."
)
)
parser.add_argument(
"--leave_existing", action="store_true",
help=(
"Don't delete existing images from cache."
)
) )
parser.add_argument( parser.add_argument(
"--redis-queue-key", default="seed_queue" "--start",
default=None,
type=dateutil.parser.parse,
help=("Mandatory argument indicating start date and time for " "the seeding."),
) )
parser.add_argument( parser.add_argument(
"--redis-host", default="localhost" "--end",
default=None,
type=dateutil.parser.parse,
help=("Mandatory argument indicating end date and time for the seeding."),
) )
parser.add_argument( parser.add_argument(
"--redis-port", type=int, default=6379 "--leave_existing",
action="store_true",
help=("Don't delete existing images from cache."),
) )
parser.add_argument("--redis-queue-key", default="seed_queue")
parser.add_argument("--redis-host", default="localhost")
parser.add_argument("--redis-port", type=int, default=6379)
parser.add_argument( parser.add_argument(
"-v", "--verbosity", type=int, default=3, choices=[0, 1, 2, 3, 4], "-v",
"--verbosity",
type=int,
default=3,
choices=[0, 1, 2, 3, 4],
help=( help=(
"Set verbosity of log output " "Set verbosity of log output "
"(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)" "(4=DEBUG, 3=INFO, 2=WARNING, 1=ERROR, 0=CRITICAL). (default: 3)"
) ),
) )
arg_values = parser.parse_args() arg_values = parser.parse_args()
setup_logging(arg_values.verbosity) setup_logging(arg_values.verbosity)
collection = os.environ.get('Collection') collection = os.environ.get("Collection")
if collection is None: if collection is None:
logger.critical("Collection environment variable not set.") logger.critical("Collection environment variable not set.")
sys.exit(1) sys.exit(1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment