Tutorial

Install

$ pip install dsq

Also you need to start redis.

Register and push task

Task is user-defined function which actual execution can be postponed by pushing name and arguments into some queue. One can have multiple queues. Queues are created on the fly.

# tasks.py
import sys
import logging
import dsq

# dsq does not init any logger by itself
# and one must do it explicitly
logging.basicConfig(level=logging.INFO)

# using 127.0.0.1:6379/0 redis by default
manager = dsq.create_manager()

def task(value):
    print value

# tasks should be registered so workers can execute them
manager.register('my-task', task)

if __name__ == '__main__':
    # put my-task into normal queue
    manager.push('normal', 'my-task', args=[sys.argv[1]])

Now run push by executing:

$ python tasks.py Hello
$ python tasks.py World

You can see queue size via stat command:

$ dsq stat -t tasks
normal      2
schedule    0

Start worker for normal queue:

$ dsq worker -b -t tasks normal
INFO:dsq.worker:Executing task(Hello)#CLCKs0nNRQqC4TKVkwDFRw
Hello
INFO:dsq.worker:Executing task(World)#LjCRG7yiQIqVKms-QfhmGg
World

-b stops worker after queue is empty.

Task decorator

There is a shortcut to register tasks and push them via dsq.manager.Manager.task() decorator:

# tasks.py
import sys
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='normal')
def task(value):
    print value

if __name__ == '__main__':
    task.push(sys.argv[1])

Queue priorities

Worker queue list is prioritized. It processes tasks from a first queue, then from a second if first is empty and so on:

# tasks.py
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='high')
def high(value):
    print 'urgent', value

@manager.task(queue='normal')
def normal(value):
    print 'normal', value

if __name__ == '__main__':
    normal.push(1)
    normal.push(2)
    normal.push(3)
    high.push(4)
    normal.push(5)
    high.push(6)

And processing:

$ python tasks.py
$ dsq stat -t tasks
high        2
normal      4
schedule    0
$ dsq worker -bt tasks high normal
INFO:dsq.worker:Executing high(4)#w9RKVQ4oQoO9ivB8q198QA
urgent 4
INFO:dsq.worker:Executing high(6)#SEss1H0QQB2TAqLQjbBpmw
urgent 6
INFO:dsq.worker:Executing normal(1)#NY-e_Nu3QT-4zCDU9LvIvA
normal 1
INFO:dsq.worker:Executing normal(2)#yy44h7tcToe5yyTSUJ7dLw
normal 2
INFO:dsq.worker:Executing normal(3)#Hx3iau2MRW2xwwOFNinJIg
normal 3
INFO:dsq.worker:Executing normal(5)#DTDpF9xkSkaChwFURRCzDQ
normal 5

Delayed tasks

You can use eta or delay parameter to postpone task:

# tasks.py
import sys
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='normal')
def task(value):
    print value

if __name__ == '__main__':
    task.modify(delay=30).push(sys.argv[1])

You should use scheduler command to queue such tasks:

$ python tasks.py boo
$ python tasks.py foo
$ date
Sun Jul 17 13:41:10 MSK 2016
$ dsq stat -t tasks
schedule    2
$ dsq schedule -t tasks
2016-07-17 13:41:32 normal  {"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
2016-07-17 13:41:34 normal  {"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}

# next command waits until all tasks will be scheduled
$ dsq scheduler -bt tasks
$ dsq stat -t tasks
normal      2
schedule    0
$ dsq queue -t tasks
{"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
{"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}
$ dsq worker -bt tasks normal
INFO:dsq.worker:Executing task(boo)#qWbsEnu2SRyjwIXga35yqA
boo
INFO:dsq.worker:Executing task(foo)#xVm3OyWjQB2XDiskTsCN4w
foo

Note

In production you need to start N workers and one scheduler to be able to process delayed tasks.

Task result

Provide keep_result parameter to be able fetch task result later:

# tasks.py
import sys
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='normal', keep_result=600)
def div(a, b):
    return a/b

if __name__ == '__main__':
    result = div.push(int(sys.argv[1]), int(sys.argv[2]))
    if result.ready(5):
        if result.error:
            print result.error, result.error_message
        else:
            print 'Result is: ', result.value
    else:
        print 'Result is not ready'

Process:

# start worker in background
$ dsq worker -t tasks normal &
[1] 6419
$ python tasks.py 10 2
INFO:dsq.worker:Executing div(10, 2)#6S_UlsECSxSddtluBLB6yQ
Result is:  5
$ python tasks.py 10 0
INFO:dsq.worker:Executing div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
ERROR:dsq.manager:Error during processing task div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
Traceback (most recent call last):
  File "/home/bobrov/work/dsq/dsq/manager.py", line 242, in process
    result = func(*args, **kwargs)
  File "./tasks.py", line 11, in div
    return a/b
ZeroDivisionError: integer division or modulo by zero
ZeroDivisionError integer division or modulo by zero
# kill worker
$ kill %1
[1]+  Done                    dsq worker -t tasks normal
$ python tasks.py 10 1
Result is not ready