import os from datetime import datetime import pytest from redis import Redis from vsq.queue import ( Queue, TaskFailedException, TaskTimeoutException, TaskStatus ) @pytest.fixture def redis(): redis = Redis( os.environ.get('REDIS_HOST', 'localhost'), 6379, ) yield redis redis.flushall() @pytest.fixture def queue(redis): yield Queue( 'name', redis, ) def test_basic(queue): # producer msg = { 'type': 'test', 'arg': 1 } orig_task = queue.put(msg) prev_id = orig_task.id assert orig_task.status == TaskStatus.ACCEPTED assert orig_task.id assert orig_task.message == msg assert orig_task.result is None assert orig_task.error is None assert isinstance(orig_task.created, datetime) assert orig_task.started is None assert orig_task.finished is None # consumer with queue.get_task() as received_task: assert received_task.status == TaskStatus.PROCESSING assert received_task.id == prev_id assert received_task.message == msg assert received_task.result is None assert received_task.error is None assert isinstance(received_task.created, datetime) assert isinstance(received_task.started, datetime) assert received_task.finished is None received_task.result = {'result': True} # producer - result result = orig_task.get() assert result == {'result': True} def test_for(queue): # producer queue.put(1) queue.put(2) queue.put(3) # consumer messages = [] for task in queue: messages.append(task.message) if len(messages) == 3: break assert messages == [1, 2, 3] def test_error(queue): # producer task = queue.put(1) # consumer try: with queue.get_task() as received_task: raise ValueError(f'Unexpected value {received_task.message}') except ValueError: pass with pytest.raises(TaskFailedException): task.get(0.25) def test_timeouts(queue): with pytest.raises(TaskTimeoutException): queue.get_task(0.25) def test_pubsub(redis): p = redis.pubsub() # redis.publish('test', 'message1') p.subscribe('test') redis.publish('test', 'message2') redis.publish('test', 'message3') print(p.get_message(timeout=1.1)) print(p.get_message(True, timeout=1.1)) print(p.get_message(True, timeout=1.1))