Source code for morphocut.parallel

import enum
import multiprocessing
import multiprocessing.synchronize
import os
import queue
import signal
import sys
import threading
import traceback
from typing import List

from morphocut.core import Pipeline, closing_if_closable

class _Signal(enum.Enum):
    END = 0
    YIELD = 1


# TODO: Look at pytorch/torch/utils/data/_utils/ for determining if the parent is dead

class StrRepr(str):
    def __repr__(self):
        return self

class ExceptionWrapper:
    """Wraps an exception plus traceback."""

    def __init__(self, where):
        exc_info = sys.exc_info()
        self.exc_type = exc_info[0]
        self.exc_msg = "".join(traceback.format_exception(*exc_info))
        self.where = where

    def reraise(self):
        """Reraise the wrapped exception in the current thread."""

        msg = "{} in {}:\n{}".format(self.exc_type.__name__, self.where, self.exc_msg)

        if self.exc_type == KeyError:
            msg = StrRepr(msg)
        raise self.exc_type(msg)

class _Stop(Exception):
    """Raised by _get_until_stop."""

def _put_until_stop(queue_, stop_event, obj):
    while True:
        if stop_event.is_set():
            return False
            queue_.put(obj, True, QUEUE_POLL_INTERVAL)
        except queue.Full:
    return True

def _get_until_stop(queue_: multiprocessing.Queue, stop_event, block=True):
    while True:
        if stop_event.is_set():
            raise _Stop()
            return queue_.get(True, QUEUE_POLL_INTERVAL)
        except queue.Empty:
            if block:

def _worker_loop(
    input_queue: multiprocessing.Queue,
    output_queue: multiprocessing.Queue,
    stop_event: multiprocessing.synchronize.Event,
    Do the actual work.

    1. Get an object from the input queue.
    2. Transform this object, and
    3. Put all resulting objects onto the result queue.
    4. Put an additional _Signal.YIELD onto the result queue to signalize
       to the reader that it may switch to another worker.
        while True:
            input_obj = _get_until_stop(input_queue, stop_event)

            if input_obj is _Signal.END:
                # Nothing is left to be done

                # Transform object
                for output_obj in transform_stream([input_obj]):
                    if stop_event.is_set():
                        # If stop event is set, quit processing this object
                    # Put results onto the output_queue
            except:  # pylint: disable=bare-except
                # Put exception to queue and quit processing
                    ExceptionWrapper("worker process PID {}".format(os.getpid()))
                # Signalize the collector to switch to the next worker

    except KeyboardInterrupt:
    except _Stop:

[docs]class ParallelPipeline(Pipeline): """ Parallel processing of the stream in multiple processes. Args: num_workers (int, optional): Number of worker processes. Default: Number of CPUs in the system. queue_size (int, optional): Upperbound limit on the number of items that can be placed in the input queue of each worker. If queue_size is less than or equal to zero, the queue size is infinite. multiprocessing_context (optional): Result of :py:func:`multiprocessing.get_context`. parent (:py:class:`~morphocut.core.Pipeline`): The parent pipeline. Note: :py:class:`~morphocut.parallel.ParallelPipeline` creates distinct copies of its nodes in each worker thread that are not accessible from the main thread. Example: .. code-block:: python with Pipeline() as pipeline: # Regular sequential processing ... with ParallelPipeline(): # Parallelized processing in this block, # work is distributed between all cores. ... """ def __init__( self, num_workers=None, queue_size=2, multiprocessing_context=None, parent=None ): super().__init__(parent=parent) if num_workers is None: num_workers = multiprocessing.cpu_count() assert num_workers >= 1 self.num_workers = num_workers self.queue_size = queue_size if multiprocessing_context is None: multiprocessing_context = multiprocessing self.multiprocessing_context = multiprocessing_context def transform_stream(self, stream): # Create queues and worker input_queues = [] # type: List[multiprocessing.Queue] output_queues = [] # type: List[multiprocessing.Queue] workers = [] # type: List[multiprocessing.Process] workers_running = [] # type: List[bool] stop_event = self.multiprocessing_context.Event() upstream_exception = [] # type: List[Exception] for i in range(self.num_workers): iqu = self.multiprocessing_context.Queue(self.queue_size) oqu = self.multiprocessing_context.Queue() w = self.multiprocessing_context.Process( target=_worker_loop, args=(iqu, oqu, super().transform_stream, stop_event), ) w.daemon = True w.start() input_queues.append(iqu) output_queues.append(oqu) workers.append(w) workers_running.append(True) # Fill input queues in a thread def _queue_filler(): try: with closing_if_closable(stream): for i, obj in enumerate(stream): # Send objects to workers in a round-robin fashion worker_idx = i % self.num_workers if not _put_until_stop( input_queues[worker_idx], stop_event, obj ): return # Tell all workers to stop working for iqu in input_queues: if not _put_until_stop(iqu, stop_event, _Signal.END): return except Exception as exc: # Stop everything immediately stop_event.set() print("ParallelPipeline._queue_filler", exc) upstream_exception.append(ExceptionWrapper(exc)) qf = threading.Thread( target=_queue_filler, name="ParallelPipeline._queue_filler" ) qf.start() # Read output queues in the main thread try: while any(workers_running): for i, oqu in enumerate(output_queues): if not workers_running[i]: continue while True: try: output_object = _get_until_stop( oqu, stop_event, block=False ) except queue.Empty: # Check that the worker process is still running if not workers[i].is_alive(): exitcode = workers[i].exitcode raise RuntimeError( f"Worker {i+1} died unexpectedly. Exit code: {_exitcode_to_signame.get(exitcode,exitcode)}" ) from None continue if output_object is _Signal.END: workers_running[i] = False break if output_object is _Signal.YIELD: # Switch to the next worker break # Re-raise exceptions from workers and stop if isinstance(output_object, ExceptionWrapper): stop_event.set() output_object.reraise() yield output_object except (SystemExit, KeyboardInterrupt, GeneratorExit, Exception) as exc: print("Stopping workers due to {}...".format(type(exc).__name__)) stop_event.set() raise finally: qf.join() for w in workers: w.join() if upstream_exception: upstream_exception[0].reraise()
# Store names for exit codes _exitcode_to_signame = {} for name, signum in list(signal.__dict__.items()): if name[:3] == "SIG" and "_" not in name: _exitcode_to_signame[-signum] = f"-{name}"