1 | #
|
---|
2 | # Simple example which uses a pool of workers to carry out some tasks.
|
---|
3 | #
|
---|
4 | # Notice that the results will probably not come out of the output
|
---|
5 | # queue in the same in the same order as the corresponding tasks were
|
---|
6 | # put on the input queue. If it is important to get the results back
|
---|
7 | # in the original order then consider using `Pool.map()` or
|
---|
8 | # `Pool.imap()` (which will save on the amount of code needed anyway).
|
---|
9 | #
|
---|
10 | # Copyright (c) 2006-2008, R Oudkerk
|
---|
11 | # All rights reserved.
|
---|
12 | #
|
---|
13 |
|
---|
14 | import time
|
---|
15 | import random
|
---|
16 |
|
---|
17 | from multiprocessing import Process, Queue, current_process, freeze_support
|
---|
18 |
|
---|
19 | #
|
---|
20 | # Function run by worker processes
|
---|
21 | #
|
---|
22 |
|
---|
23 | def worker(input, output):
|
---|
24 | for func, args in iter(input.get, 'STOP'):
|
---|
25 | result = calculate(func, args)
|
---|
26 | output.put(result)
|
---|
27 |
|
---|
28 | #
|
---|
29 | # Function used to calculate result
|
---|
30 | #
|
---|
31 |
|
---|
32 | def calculate(func, args):
|
---|
33 | result = func(*args)
|
---|
34 | return '%s says that %s%s = %s' % \
|
---|
35 | (current_process().name, func.__name__, args, result)
|
---|
36 |
|
---|
37 | #
|
---|
38 | # Functions referenced by tasks
|
---|
39 | #
|
---|
40 |
|
---|
41 | def mul(a, b):
|
---|
42 | time.sleep(0.5*random.random())
|
---|
43 | return a * b
|
---|
44 |
|
---|
45 | def plus(a, b):
|
---|
46 | time.sleep(0.5*random.random())
|
---|
47 | return a + b
|
---|
48 |
|
---|
49 | #
|
---|
50 | #
|
---|
51 | #
|
---|
52 |
|
---|
53 | def test():
|
---|
54 | NUMBER_OF_PROCESSES = 4
|
---|
55 | TASKS1 = [(mul, (i, 7)) for i in range(20)]
|
---|
56 | TASKS2 = [(plus, (i, 8)) for i in range(10)]
|
---|
57 |
|
---|
58 | # Create queues
|
---|
59 | task_queue = Queue()
|
---|
60 | done_queue = Queue()
|
---|
61 |
|
---|
62 | # Submit tasks
|
---|
63 | for task in TASKS1:
|
---|
64 | task_queue.put(task)
|
---|
65 |
|
---|
66 | # Start worker processes
|
---|
67 | for i in range(NUMBER_OF_PROCESSES):
|
---|
68 | Process(target=worker, args=(task_queue, done_queue)).start()
|
---|
69 |
|
---|
70 | # Get and print results
|
---|
71 | print 'Unordered results:'
|
---|
72 | for i in range(len(TASKS1)):
|
---|
73 | print '\t', done_queue.get()
|
---|
74 |
|
---|
75 | # Add more tasks using `put()`
|
---|
76 | for task in TASKS2:
|
---|
77 | task_queue.put(task)
|
---|
78 |
|
---|
79 | # Get and print some more results
|
---|
80 | for i in range(len(TASKS2)):
|
---|
81 | print '\t', done_queue.get()
|
---|
82 |
|
---|
83 | # Tell child processes to stop
|
---|
84 | for i in range(NUMBER_OF_PROCESSES):
|
---|
85 | task_queue.put('STOP')
|
---|
86 |
|
---|
87 |
|
---|
88 | if __name__ == '__main__':
|
---|
89 | freeze_support()
|
---|
90 | test()
|
---|