taskproc’s documentation

Contents:

Introduction

taskproc (task processor) is a simple Python module designed for running a set of tasks which can have dependencies on each other. The idea is to have a make-like dependency resolution within a Python program. Tasks are queued for execution until their requirements are met. When executed, tasks can return a value which is then passed down to dependent tasks.

There are two modes of task execution: simple execution of tasks in the main program or multi-threaded execution.

taskproc is licensed under the Apache 2.0 license and is copyright Jeremy Sanders. It should be compatible with Python 2.6+ and Python 3.2+.

Examples

Simple functions

import taskproc

def func1(dep_results):
    num = 42
    print(num)
    return num

def func2(dep_results):
    num = 12
    print(num)
    return num

def func3(dep_results):
    num = sum(dep_results)
    print(num)
    return num

task1 = taskproc.Task(func=func1)
task2 = taskproc.Task(func=func2)
task3 = taskproc.Task(func=func3, requires=[task1, task2])

# single-threaded processing
queue = taskproc.TaskQueue()
queue.add(task3)

# "with" statement optional for single-threaded processing but is
# required (or start()/stop() used) for TaskQueueThreaded
with queue:
    queue.process()

Here is a simple example executing three tasks in the main thread. The results from func1() and func2() are passed in the list dep_results to func3(), which are then summed. This will print the values 12 and 42 (ordering is not guaranteed for independent tasks), then the sum 54. Note that tasks do not necessarily to return values and could execute external data processing commands.

Subclassing and passing extra arguments

from __future__ import print_function
import time
import taskproc

class Task1(taskproc.Task):
    def run(self):
        print('Task 1')
        time.sleep(5)

def taskfunc(dep_results, num):
    print('Task', num)
    time.sleep(5)

task1 = Task1()
task2 = taskproc.Task(func=taskfunc, args=(2,), requires=[task1])
task3 = taskproc.Task(func=taskfunc, args=(3,), requires=[task1])
task4 = taskproc.Task(func=taskfunc, args=(4,), requires=[task1,task2])

queue = taskproc.TaskQueueThread(2)
queue.add(task4)
with queue:
    queue.process()

The following example uses subclasses Task instead of supplying a function for task1. It uses the args parameter for the tasks 2-4 which supplies extra aguments to the function. The TaskQueueThread uses two threads, which means task2 and task3 are run simultaneously.

API documentation

class taskproc.Task(func=None, requires=[], args=(), kwargs={})

A Task is a unit of work.

Tasks are run by calling the run() method, which by default calls the callable func passed to the constructor. Tasks can require other tasks, by passing them in requires when constructing, or by using add_requirement.

Parameters:
  • func (function) – what to run for task
  • requires (list of Task) – tasks required before this task
  • args (tuple) – additional arguments to pass to func
  • kwargs (dict) – additional keyword arguments to pass to func
add_requirement(req)

Add a requirement task to this task.

Parameters:req (Task) – add as requirement to run before this Task

Note: do not add to the requirements if this task has already been added to a TaskQueue, unless you know that it has other requirements which have not been met. The task may have already been queued for execution.

Changing the requirements of a task after it has been added means that the TaskQueue will not detect required tasks with unmet dependencies at the end of run.

args = None

additional arguments passed to func (tuple)

func = None

call by run() by default when task executed (function or callable)

kwargs = None

additional keyword arguments passed to func (dict)

pendingon = None

Task objects pending on completion of this task (set)

reqresults = None

results of processed required tasks (list)

run()

Called when task is run. Optionally override this.

By default runs self.func(self.reqresults, *self.args, **self.kwargs)

class taskproc.TaskQueue

Process Tasks in a simple linear way.

add(task)

Add task to queue to be processed.

Parameters:taskTask to add, this should be the Task that depends on all other required tasks.
process(abortpending=True)

Process all items in queue.

Parameters:abortpending – if there are encountered tasks with unsatisfied dependencies at the end, raise a TaskProcError
class taskproc.TaskQueueThread(nthreads, onstart=None, onend=None)

Process Tasks with multiple threads.

Parameters:
  • nthreads (int) – number of threads to use
  • onstart (callable) – run this at the start of each thread
  • onend (callable) – run this when finishing each thread

When using threads care must be taken if common objects are accessed in the different tasks.

The recommended way to use this class is to wrap TaskQueueThread.process() with a with statement which automatically calls start() and stop(), e.g.

q = TaskQueueThread(q)
q.add(...)
with q:
    q.process()
add(task)

Add task to queue to be processed.

Parameters:taskTask to add, this should be the Task that depends on all other required tasks.
end()

End processing threads. This should be called before the program ends or a with statement should be used.

process(abortpending=True)

Process all items in queue.

Parameters:abortpending – if there are encountered tasks with unsatisfied dependencies at the end, raise a TaskProcError
start()

Start processing threads. This must be called before process() or a with statement should be used.

class taskproc.TaskProcError

Exception raised if error encountered in this module.

A subclass of RuntimeError.

Indices and tables