executors.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import collections
  2. import itertools
  3. from concurrent.futures import ThreadPoolExecutor
  4. import time
  5. class LazyThreadPoolExecutor(ThreadPoolExecutor):
  6. def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None):
  7. """
  8. Collects iterables lazily, rather than immediately.
  9. Docstring same as parent: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor
  10. Implmentation taken from this PR: https://github.com/python/cpython/pull/707
  11. """
  12. if timeout is not None: end_time = timeout + time.time()
  13. if prefetch is None: prefetch = self._max_workers
  14. if prefetch < 0: raise ValueError("prefetch count may not be negative")
  15. argsiter = zip(*iterables)
  16. fs = collections.deque(self.submit(fn, *args) for args in itertools.islice(argsiter, self._max_workers+prefetch))
  17. # Yield must be hidden in closure so that the futures are submitted before the first iterator value is required.
  18. def result_iterator():
  19. nonlocal argsiter
  20. try:
  21. while fs:
  22. res = fs[0].result() if timeout is None else fs[0].result(end_time-time.time())
  23. # Got a result, future needn't be cancelled
  24. del fs[0]
  25. # Dispatch next task before yielding to keep pipeline full
  26. if argsiter:
  27. try:
  28. args = next(argsiter)
  29. except StopIteration:
  30. argsiter = None
  31. else:
  32. fs.append(self.submit(fn, *args))
  33. yield res
  34. finally:
  35. for future in fs: future.cancel()
  36. return result_iterator()