API

dsq

dsq.is_main()

Returns True if dsq command is in progress

May be useful for tasks module which imports other tasks to avoid recursive imports:

#tasks.py
import dsq

if dsq.is_main():
    import sms_tasks
    import billing_tasks

dsq.manager

class dsq.manager.Manager(queue, result=None, sync=False, unknown=None, default_queue=None)

DSQ manager

Allows to register task functions, push tasks and get task results

Parameters:
  • queueQueueStore to use for tasks.
  • resultResultStore to use for task results.
  • sync – Synchronous operation. Task will be executed immediately during push() call.
  • unknown – Name of unknown queue for tasks for which there is no registered functions. Default is ‘unknown’.
  • default_queue – Name of default queue. Default is ‘dsq’.
pop(queue_list, timeout=None)

Pop item from the first not empty queue in queue_list

Parameters:
  • queue_list – List of queue names.
  • timeout – Wait item for this amount of seconds (integer). By default blocks forever.
item = manager.pop(['high', 'normal'], 1)
if item:
    manager.process(item)
process(task, now=None, log_exc=True)

Process task item

Parameters:
  • task – Task.
  • now – Unix timestamp to compare with task.expire time and set eta on retry.
  • log_exc – Log any exception during task execution. True by default.
push(queue, name, args=None, kwargs=None, meta=None, ttl=None, eta=None, delay=None, dead=None, retry=None, retry_delay=None, timeout=None, keep_result=None)

Add task into queue

Parameters:
  • queue – Queue name.
  • name – Task name.
  • args – Task args.
  • kwargs – Task kwargs.
  • meta – Task additional info.
  • ttl – Task time to live.
  • eta – Schedule task execution for particular unix timestamp.
  • delay – Postpone task execution for particular amount of seconds.
  • dead – Name of dead-letter queue.
  • retry – Retry task execution after exception. True - forever, number - retry this amount.
  • retry_delay – Delay between retry attempts.
  • timeout – Task execution timeout.
  • keep_result – Keep task return value for this amount of seconds. Result is ignored by default.
register(name, func, with_context=False, init_state=None)

Register task

Parameters:
  • name – Task name.
  • func – Function.
  • with_context – Provide task context as first task argument.
  • init_state – Task state initializer.
def add(a, b):
    return a + b

manager.register('add', add)
manager.push('normal', 'add', (1, 2), keep_result=300)
set_result(task, result=None, exc_info=None, now=None, log_exc=True)

Set result for task item

Parameters:
  • task – Task.
  • result – Result value.
  • exc_info – Set exception info, retrieve it via sys.exc_info() if True.
  • now – Unix timestamp to set eta on retry.
  • log_exc – Log exc_info if any. True by default.
task(name=None, queue=None, with_context=False, init_state=None, **kwargs)

Task decorator

Function wrapper to register task in manager and provide simple interface to calling it.

Parameters:
  • name – Task name, dsq will use func.__name__ if not provided.
  • queue – Queue name to use.
  • with_context – Provide task context as first task argument.
  • init_state – Task state initializer.
  • **kwrags – Rest params as for push().
@manager.task
def task1(arg):
    long_running_func(arg)

@manager.task(name='custom-name', queue='low', with_context=True)
def task2(ctx, arg):
    print ctx.task['id']
    return long_running_func(arg)

task1.push('boo')  # push task to queue
task2.modify(keep_result=300).push('foo')  # push task with keep_result option.
task1('boo')  # direct call of task1.