Tutorial¶
Register and push task¶
Task is user-defined function which actual execution can be postponed by pushing name and arguments into some queue. One can have multiple queues. Queues are created on the fly.
# tasks.py
import sys
import logging
import dsq
# dsq does not init any logger by itself
# and one must do it explicitly
logging.basicConfig(level=logging.INFO)
# using 127.0.0.1:6379/0 redis by default
manager = dsq.create_manager()
def task(value):
print value
# tasks should be registered so workers can execute them
manager.register('my-task', task)
if __name__ == '__main__':
# put my-task into normal queue
manager.push('normal', 'my-task', args=[sys.argv[1]])
Now run push by executing:
$ python tasks.py Hello
$ python tasks.py World
You can see queue size via stat
command:
$ dsq stat -t tasks
normal 2
schedule 0
Start worker for normal
queue:
$ dsq worker -b -t tasks normal
INFO:dsq.worker:Executing task(Hello)#CLCKs0nNRQqC4TKVkwDFRw
Hello
INFO:dsq.worker:Executing task(World)#LjCRG7yiQIqVKms-QfhmGg
World
-b
stops worker after queue is empty.
Task decorator¶
There is a shortcut to register tasks and push them via
dsq.manager.Manager.task()
decorator:
# tasks.py
import sys
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='normal')
def task(value):
print value
if __name__ == '__main__':
task.push(sys.argv[1])
Queue priorities¶
Worker queue list is prioritized. It processes tasks from a first queue, then from a second if first is empty and so on:
# tasks.py
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='high')
def high(value):
print 'urgent', value
@manager.task(queue='normal')
def normal(value):
print 'normal', value
if __name__ == '__main__':
normal.push(1)
normal.push(2)
normal.push(3)
high.push(4)
normal.push(5)
high.push(6)
And processing:
$ python tasks.py
$ dsq stat -t tasks
high 2
normal 4
schedule 0
$ dsq worker -bt tasks high normal
INFO:dsq.worker:Executing high(4)#w9RKVQ4oQoO9ivB8q198QA
urgent 4
INFO:dsq.worker:Executing high(6)#SEss1H0QQB2TAqLQjbBpmw
urgent 6
INFO:dsq.worker:Executing normal(1)#NY-e_Nu3QT-4zCDU9LvIvA
normal 1
INFO:dsq.worker:Executing normal(2)#yy44h7tcToe5yyTSUJ7dLw
normal 2
INFO:dsq.worker:Executing normal(3)#Hx3iau2MRW2xwwOFNinJIg
normal 3
INFO:dsq.worker:Executing normal(5)#DTDpF9xkSkaChwFURRCzDQ
normal 5
Delayed tasks¶
You can use eta
or delay
parameter to postpone task:
# tasks.py
import sys
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='normal')
def task(value):
print value
if __name__ == '__main__':
task.modify(delay=30).push(sys.argv[1])
You should use scheduler
command to queue such tasks:
$ python tasks.py boo
$ python tasks.py foo
$ date
Sun Jul 17 13:41:10 MSK 2016
$ dsq stat -t tasks
schedule 2
$ dsq schedule -t tasks
2016-07-17 13:41:32 normal {"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
2016-07-17 13:41:34 normal {"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}
# next command waits until all tasks will be scheduled
$ dsq scheduler -bt tasks
$ dsq stat -t tasks
normal 2
schedule 0
$ dsq queue -t tasks
{"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
{"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}
$ dsq worker -bt tasks normal
INFO:dsq.worker:Executing task(boo)#qWbsEnu2SRyjwIXga35yqA
boo
INFO:dsq.worker:Executing task(foo)#xVm3OyWjQB2XDiskTsCN4w
foo
Note
In production you need to start N workers and one scheduler to be able to process delayed tasks.
Task result¶
Provide keep_result
parameter to be able fetch task result later:
# tasks.py
import sys
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='normal', keep_result=600)
def div(a, b):
return a/b
if __name__ == '__main__':
result = div.push(int(sys.argv[1]), int(sys.argv[2]))
if result.ready(5):
if result.error:
print result.error, result.error_message
else:
print 'Result is: ', result.value
else:
print 'Result is not ready'
Process:
# start worker in background
$ dsq worker -t tasks normal &
[1] 6419
$ python tasks.py 10 2
INFO:dsq.worker:Executing div(10, 2)#6S_UlsECSxSddtluBLB6yQ
Result is: 5
$ python tasks.py 10 0
INFO:dsq.worker:Executing div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
ERROR:dsq.manager:Error during processing task div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
Traceback (most recent call last):
File "/home/bobrov/work/dsq/dsq/manager.py", line 242, in process
result = func(*args, **kwargs)
File "./tasks.py", line 11, in div
return a/b
ZeroDivisionError: integer division or modulo by zero
ZeroDivisionError integer division or modulo by zero
# kill worker
$ kill %1
[1]+ Done dsq worker -t tasks normal
$ python tasks.py 10 1
Result is not ready