The Multiprocessing and Threading Modules
The map(function, iterable) method allocates items from the iterable to each
worker in the pool. The finished results are collected in the order they were allocated
to the Pool object so that order is preserved.
The imap(function, iterable) method is described as "lazier" than map.
By default, it sends each individual item from the iterable to the next available
worker. This might involve more communication overhead. For this reason,
a chunk size larger than 1 is suggested.
The imap_unordered(function, iterable) method is similar to the imap()
method, but the order of the results is not preserved. Allowing the mapping to be
processed out of order means that, as each process finishes, the results are collected.
Otherwise, the results must be collected in order.
The starmap(function, iterable) method is similar to the itertools.
starmap() function. Each item in the iterable must be a tuple; the tuple is passed
to the function using the modifier so that each value of the tuple becomes a
positional argument value. In effect, it's performing function(iterable[0]),
function(*iterable[1]), and so on.
Here is one of the variations on the preceding mapping theme:
import multiprocessing
pattern = "*.gz"
combined= Counter()
with multiprocessing.Pool() as workers:
for result in workers.imap_unordered(analysis,
glob.glob(pattern)):
combined.update(result)
We've created a Counter() function that we'll use to consolidate the results from
each worker in the pool. We created a pool of subprocesses based on the number of
available CPUs and used the Pool object as a context manager. We then mapped our
analysis() function to each file in our file-matching pattern. The resulting Counter
objects from the analysis() function are combined into a single resulting counter.
This takes about 68 seconds. The time to analyze the logs was cut in half using
several concurrent processes.
We've created a two-tiered map-reduce process with the multiprocessing module's
Pool.map() function. The first tier was the analysis() function, which performed
a map-reduce on a single logfile. We then consolidated these reductions in a
higher-level reduce operation.