Introduction¶
taskproc (task processor) is a simple Python module designed for running a set of tasks which can have dependencies on each other. The idea is to have a make-like dependency resolution within a Python program. Tasks are queued for execution until their requirements are met. When executed, tasks can return a value which is then passed down to dependent tasks.
There are two modes of task execution: simple execution of tasks in the main program or multi-threaded execution.
taskproc is licensed under the Apache 2.0 license and is copyright Jeremy Sanders. It should be compatible with Python 2.6+ and Python 3.2+.
Examples¶
Simple functions¶
import taskproc
def func1(dep_results):
num = 42
print(num)
return num
def func2(dep_results):
num = 12
print(num)
return num
def func3(dep_results):
num = sum(dep_results)
print(num)
return num
task1 = taskproc.Task(func=func1)
task2 = taskproc.Task(func=func2)
task3 = taskproc.Task(func=func3, requires=[task1, task2])
# single-threaded processing
queue = taskproc.TaskQueue()
queue.add(task3)
# "with" statement optional for single-threaded processing but is
# required (or start()/stop() used) for TaskQueueThreaded
with queue:
queue.process()
Here is a simple example executing three tasks in the main thread. The results from func1() and func2() are passed in the list dep_results to func3(), which are then summed. This will print the values 12 and 42 (ordering is not guaranteed for independent tasks), then the sum 54. Note that tasks do not necessarily to return values and could execute external data processing commands.
Subclassing and passing extra arguments¶
from __future__ import print_function
import time
import taskproc
class Task1(taskproc.Task):
def run(self):
print('Task 1')
time.sleep(5)
def taskfunc(dep_results, num):
print('Task', num)
time.sleep(5)
task1 = Task1()
task2 = taskproc.Task(func=taskfunc, args=(2,), requires=[task1])
task3 = taskproc.Task(func=taskfunc, args=(3,), requires=[task1])
task4 = taskproc.Task(func=taskfunc, args=(4,), requires=[task1,task2])
queue = taskproc.TaskQueueThread(2)
queue.add(task4)
with queue:
queue.process()
The following example uses subclasses Task instead of supplying a function for task1. It uses the args parameter for the tasks 2-4 which supplies extra aguments to the function. The TaskQueueThread uses two threads, which means task2 and task3 are run simultaneously.
API documentation¶
-
class
taskproc.Task(func=None, requires=[], args=(), kwargs={})¶ A Task is a unit of work.
Tasks are run by calling the run() method, which by default calls the callable func passed to the constructor. Tasks can require other tasks, by passing them in requires when constructing, or by using add_requirement.
Parameters: - func (function) – what to run for task
- requires (list of Task) – tasks required before this task
- args (tuple) – additional arguments to pass to func
- kwargs (dict) – additional keyword arguments to pass to func
-
add_requirement(req)¶ Add a requirement task to this task.
Parameters: req (Task) – add as requirement to run before this Task Note: do not add to the requirements if this task has already been added to a TaskQueue, unless you know that it has other requirements which have not been met. The task may have already been queued for execution.
Changing the requirements of a task after it has been added means that the TaskQueue will not detect required tasks with unmet dependencies at the end of run.
-
args= None¶ additional arguments passed to func (tuple)
-
func= None¶ call by run() by default when task executed (function or callable)
-
kwargs= None¶ additional keyword arguments passed to func (dict)
-
pendingon= None¶ Task objects pending on completion of this task (set)
-
reqresults= None¶ results of processed required tasks (list)
-
run()¶ Called when task is run. Optionally override this.
By default runs
self.func(self.reqresults, *self.args, **self.kwargs)
-
class
taskproc.TaskQueue¶ Process Tasks in a simple linear way.
-
add(task)¶ Add task to queue to be processed.
Parameters: task – Task to add, this should be the Task that depends on all other required tasks.
-
process(abortpending=True)¶ Process all items in queue.
Parameters: abortpending – if there are encountered tasks with unsatisfied dependencies at the end, raise a TaskProcError
-
-
class
taskproc.TaskQueueThread(nthreads, onstart=None, onend=None)¶ Process Tasks with multiple threads.
Parameters: - nthreads (int) – number of threads to use
- onstart (callable) – run this at the start of each thread
- onend (callable) – run this when finishing each thread
When using threads care must be taken if common objects are accessed in the different tasks.
The recommended way to use this class is to wrap TaskQueueThread.process() with a with statement which automatically calls start() and stop(), e.g.
q = TaskQueueThread(q) q.add(...) with q: q.process()
-
add(task)¶ Add task to queue to be processed.
Parameters: task – Task to add, this should be the Task that depends on all other required tasks.
-
end()¶ End processing threads. This should be called before the program ends or a with statement should be used.
-
process(abortpending=True)¶ Process all items in queue.
Parameters: abortpending – if there are encountered tasks with unsatisfied dependencies at the end, raise a TaskProcError
-
start()¶ Start processing threads. This must be called before process() or a with statement should be used.
-
class
taskproc.TaskProcError¶ Exception raised if error encountered in this module.
A subclass of RuntimeError.