EOX GitLab Instance

Skip to content
Snippets Groups Projects
Commit 92047171 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Fixing preprocessor queue set up

parent 7ecf3b56
No related branches found
No related tags found
No related merge requests found
......@@ -53,12 +53,13 @@ def cli():
@click.option('--host', type=str)
@click.option('--port', type=int)
@click.option('--listen-queue', type=str)
@click.option('--listen-md-queue', type=str)
@click.option('--write-queue', type=str)
def daemon(config_file=None, use_dir=None, validate=False, host=None, port=None, listen_queue=None, write_queue=None):
def daemon(config_file=None, use_dir=None, validate=False, host=None, port=None, listen_queue=None, listen_md_queue=None, write_queue=None):
config = load_config(config_file)
if validate:
validate_config(config)
run_daemon(config, host, port, listen_queue, write_queue)
run_daemon(config, host, port, listen_queue, listen_md_queue, write_queue)
@cli.command(help='Run a single, one-off preprocessing')
......
import redis
import logging
import json
from .preprocess import preprocess_file
from .preprocess import preprocess_file, preprocess_browse
logger = logging.getLogger(__name__)
def run_daemon(config, host, port, listen_queue, write_queue):
def run_daemon(config, host, port, listen_queue, listen_md_queue, write_queue):
""" Run the preprocessing daemon, listening on a redis queue
for files to be preprocessed. After preprocessing the filename
of the preprocessed files will be pushed to the output queue.
......@@ -19,10 +20,15 @@ def run_daemon(config, host, port, listen_queue, write_queue):
logger.debug("waiting for redis queue '%s'..." % listen_queue)
while True:
# fetch an item from the queue to be preprocessed
value = client.brpop(listen_queue)[0]
value, queue = client.brpop([listen_queue, listen_md_queue])
# start the preprocessing on that file
filenames = preprocess_file(config, value)
if queue == listen_queue:
filenames = preprocess_file(config, value)
elif queue == listen_md_queue:
browse_report = json.loads(value)
filenames = preprocess_browse(config, browse_report['browse_type'], browse_report, browse)
# TODO: convert to string, list, ....
# write the filenames to the queue
......
......@@ -295,10 +295,39 @@ def preprocess_browse(config: dict, browse_type: str, browse_report: dict, brows
logger.debug('Using preprocessing config %s' % pformat(preprocess_config))
preprocess_internal(preprocess_config)
# get an uploader for the finalized images
target_config = config['target']
uploader = get_uploader(
target_config['type'], target_config.get('args'), target_config.get('kwargs')
)
upload_filenames = [
os.path.join(dirpath, filename)
for dirpath, _, filenames in os.walk('upload')
for filename in filenames
]
# TODO: upload location?
file_path = ''
# send all files in the upload directory to the target storage
logger.info(
'Starting uploading of %d files to %s'
% (len(upload_filenames), file_path)
)
with Timer() as upload_timer:
uploader.upload(upload_filenames, file_path)
logger.info(
'Finished uploading after %f seconds.'
% (upload_timer.elapsed)
)
logger.info(
'Finished preprocessing of browse "%s" after %f seconds.'
% (filename, preprocess_timer.elapsed)
)
return upload_filenames
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment