Source code for PermutationImportance.multiprocessing_utils
"""These are utilities designed for carefully handling communication between
processes while multithreading.
The code for ``pool_imap_unordered`` is copied nearly wholesale from GrantJ's
`Stack Overflow answer here
<https://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration?noredirect=1&lq=1>`_.
It allows for a lazy imap over an iterable and the return of very large objects
"""
from multiprocessing import Process, Queue, cpu_count
try:
from Queue import Full as QueueFull
from Queue import Empty as QueueEmpty
except ImportError: # python3
from queue import Full as QueueFull
from queue import Empty as QueueEmpty
__all__ = ["pool_imap_unordered"]
def worker(func, recvq, sendq):
for args in iter(recvq.get, None):
result = (args[0], func(*args[1:]))
sendq.put(result)
[docs]def pool_imap_unordered(func, iterable, procs=cpu_count()):
"""Lazily imaps in an unordered manner over an iterable in parallel as a
generator
:Author: Grant Jenks <https://stackoverflow.com/users/232571/grantj>
:param func: function to perform on each iterable
:param iterable: iterable which has items to map over
:param procs: number of workers in the pool. Defaults to the cpu count
:yields: the results of the mapping
"""
# Create queues for sending/receiving items from iterable.
sendq = Queue(procs)
recvq = Queue()
# Start worker processes.
for rpt in range(procs):
Process(target=worker, args=(func, sendq, recvq)).start()
# Iterate iterable and communicate with worker processes.
send_len = 0
recv_len = 0
itr = iter(iterable)
try:
value = next(itr)
while True:
try:
sendq.put(value, True, 0.1)
send_len += 1
value = next(itr)
except QueueFull:
while True:
try:
result = recvq.get(False)
recv_len += 1
yield result
except QueueEmpty:
break
except StopIteration:
pass
# Collect all remaining results.
while recv_len < send_len:
result = recvq.get()
recv_len += 1
yield result
# Terminate worker processes.
for rpt in range(procs):
sendq.put(None)