Source code for morphocut.core

"""Core components of the MorphoCut processing graph."""

import copy
import inspect
import itertools
import operator
from abc import ABC, abstractmethod
from collections import abc
from contextlib import contextmanager
from functools import wraps
from typing import (

_pipeline_stack = []  # type: List[Pipeline] # pylint: disable=invalid-name

class StreamTransformer(ABC):
    """ABC for stream transformers like Pipeline and Node."""

    def __init__(self): = "{:x}".format(id(self))

    def transform_stream(self, stream: "Stream") -> "Stream":
        while False:

    def __subclasshook__(cls, C):
        if cls is StreamTransformer:
            if any("transform_stream" in B.__dict__ for B in C.__mro__):
                return True
        return NotImplemented

def closing_if_closable(stream):
        yield stream

def resolve_variable(obj, variable_or_value):
    if isinstance(variable_or_value, Variable):
        return obj[variable_or_value]

    if isinstance(variable_or_value, tuple):
        return tuple(resolve_variable(obj, v) for v in variable_or_value)

    if isinstance(variable_or_value, list):
        return list(resolve_variable(obj, v) for v in variable_or_value)

    if isinstance(variable_or_value, dict):
        return {k: resolve_variable(obj, v) for k, v in variable_or_value.items()}

    return variable_or_value

T = TypeVar("T")

[docs]class Variable(Generic[T]): """ A Variable identifies a value in a stream object. Variables are (almost) never instanciated manually, they are created when calling a Node. Attributes: name: The name of the Variable. parent: The parent that created the Variable. This is just used in the string representation. Operations: Variables support the following operations. Each operation is realized as a new Node in the Pipeline, so use them sparingly. Operator and method can be used interchangeably (if both present). +-----------------------+-----------------------+---------------------------------+ | Operation | Operator | Method | +=======================+=======================+=================================+ | Addition | ``a + b`` | ``a.add(b)`` | +-----------------------+-----------------------+---------------------------------+ | Containment Test | | ``a.contains(b)``, ``b.in_(a)`` | +-----------------------+-----------------------+---------------------------------+ | True Division | ``a / b`` | ``a.truediv(b)`` | +-----------------------+-----------------------+---------------------------------+ | Integer Division | ``a // b`` | ``a.floordiv(b)`` | +-----------------------+-----------------------+---------------------------------+ | Bitwise And | ``a & b`` | ``a.and_(b)`` | +-----------------------+-----------------------+---------------------------------+ | Bitwise Exclusive Or | ``a ^ b`` | ``a.xor(b)`` | +-----------------------+-----------------------+---------------------------------+ | Bitwise Inversion | ``~ a`` | ``a.invert(b)`` | +-----------------------+-----------------------+---------------------------------+ | Bitwise Or | ``a | b`` | ``a.or_(b)`` | +-----------------------+-----------------------+---------------------------------+ | Exponentiation | ``a ** b`` | ``a.pow(b)`` | +-----------------------+-----------------------+---------------------------------+ | Identity | | ``a.is_(b)`` | +-----------------------+-----------------------+---------------------------------+ | Identity | | ``a.is_not(b)`` | +-----------------------+-----------------------+---------------------------------+ | Indexed Assignment | ``obj[k] = v`` | | +-----------------------+-----------------------+---------------------------------+ | Indexed Deletion | ``del obj[k]`` | | +-----------------------+-----------------------+---------------------------------+ | Indexing | ``obj[k]`` | | +-----------------------+-----------------------+---------------------------------+ | Left Shift | ``a << b`` | ``a.lshift(b)`` | +-----------------------+-----------------------+---------------------------------+ | Modulo | ``a % b`` | ``a.mod(b)`` | +-----------------------+-----------------------+---------------------------------+ | Multiplication | ``a * b`` | ``a.mul(b)`` | +-----------------------+-----------------------+---------------------------------+ | Matrix Multiplication | ``a @ b`` | ``a.matmul(b)`` | +-----------------------+-----------------------+---------------------------------+ | Negation (Arithmetic) | ``- a`` | ``a.neg(b)`` | +-----------------------+-----------------------+---------------------------------+ | Negation (Logical) | | ``a.not_()`` | +-----------------------+-----------------------+---------------------------------+ | Positive | ``+ a`` | ``a.pos(b)`` | +-----------------------+-----------------------+---------------------------------+ | Right Shift | ``a >> b`` | ``a.rshift(b)`` | +-----------------------+-----------------------+---------------------------------+ | Slice Assignment | ``seq[i:j] = values`` | | +-----------------------+-----------------------+---------------------------------+ | Slice Deletion | ``del seq[i:j]`` | | +-----------------------+-----------------------+---------------------------------+ | Slicing | ``seq[i:j]`` | | +-----------------------+-----------------------+---------------------------------+ | Subtraction | ``a - b`` | ``a.sub(b)`` | +-----------------------+-----------------------+---------------------------------+ | Ordering | ``a < b`` | ```` | +-----------------------+-----------------------+---------------------------------+ | Ordering | ``a <= b`` | ``a.leq(b)`` | +-----------------------+-----------------------+---------------------------------+ | Equality | ``a == b`` | ``a.eq(b)`` | +-----------------------+-----------------------+---------------------------------+ | Difference | ``a != b`` | ```` | +-----------------------+-----------------------+---------------------------------+ | Ordering | ``a >= b`` | ```` | +-----------------------+-----------------------+---------------------------------+ | Ordering | ``a > b`` | ```` | +-----------------------+-----------------------+---------------------------------+ ``a``, ``b``, ``i``, ``j`` and ``k`` can be either :py:class:`Variable` instances or raw values. """ __slots__ = ["name", "parent", "hash"] def __init__(self, name: str, parent: Any): = name self.parent = parent self.hash = hash((, name)) def __str__(self): return "<Variable {}.{}>".format(self.parent, def __repr__(self): return self.__str__() # Attribute access def __getattr__(self, name): return Call(getattr, self, name) # Item access def __getitem__(self, key): return Call(operator.getitem, self, key) def __setitem__(self, key, value): return Call(operator.setitem, self, key, value) def __delitem__(self, key): return Call(operator.delitem, self, key) # Rich comparison methods def __lt__(self, other): return Call(, self, other) def __le__(self, other): return Call(operator.le, self, other) def __eq__(self, other): return Call(operator.eq, self, other) def __ne__(self, other): return Call(, self, other) def __gt__(self, other): return Call(, self, other) def __ge__(self, other): return Call(, self, other) # Binary arithmetic operations def __add__(self, other): return Call(operator.add, self, other) def __sub__(self, other): return Call(operator.sub, self, other) def __mul__(self, other): return Call(operator.mul, self, other) def __matmul__(self, other): return Call(operator.matmul, self, other) def __truediv__(self, other): return Call(operator.truediv, self, other) def __floordiv__(self, other): return Call(operator.floordiv, self, other) def __mod__(self, other): return Call(operator.mod, self, other) def __pow__(self, other): return Call(operator.pow, self, other) def __lshift__(self, other): return Call(operator.lshift, self, other) def __rshift__(self, other): return Call(operator.rshift, self, other) def __and__(self, other): return Call(operator.and_, self, other) def __xor__(self, other): return Call(operator.xor, self, other) def __or__(self, other): return Call(operator.or_, self, other) # Binary arithmetic operations with reflected (swapped) operands def __radd__(self, other): return Call(operator.add, other, self) def __rsub__(self, other): return Call(operator.sub, other, self) def __rmul__(self, other): return Call(operator.mul, other, self) def __rmatmul__(self, other): return Call(operator.matmul, other, self) def __rtruediv__(self, other): return Call(operator.truediv, other, self) def __rfloordiv__(self, other): return Call(operator.floordiv, other, self) def __rmod__(self, other): return Call(operator.mod, other, self) def __rpow__(self, other): return Call(operator.pow, other, self) def __rlshift__(self, other): return Call(operator.lshift, other, self) def __rrshift__(self, other): return Call(operator.rshift, other, self) def __rand__(self, other): return Call(operator.and_, other, self) def __rxor__(self, other): return Call(operator.xor, other, self) def __ror__(self, other): return Call(operator.or_, other, self) # Unary arithmetic operations def __neg__(self): return Call(operator.neg, self) def __pos__(self): return Call(operator.pos, self) def __abs__(self): return Call(operator.abs, self) def __invert__(self): return Call(operator.invert, self) # Above operators without underscores getattr = __getattr__ lt = __lt__ le = __le__ eq = __eq__ ne = __ne__ gt = __gt__ ge = __ge__ add = __add__ sub = __sub__ mul = __mul__ matmul = __matmul__ truediv = __truediv__ floordiv = __floordiv__ mod = __mod__ pow = __pow__ lshift = __lshift__ rshift = __rshift__ and_ = __and__ xor = __xor__ or_ = __or__ neg = __neg__ pos = __pos__ abs = __abs__ invert = __invert__ # Special operators def not_(self): """Return the outcome of not obj.""" return Call(operator.not_, self) def truth(self): """Return True if obj is true, and False otherwise.""" return Call(operator.truth, self) def is_(self, other): """Return ``self is other``. Tests object identity.""" return Call(operator.is_, self, other) def is_not(self, other): """Return ``self is not other``. Tests object identity.""" return Call(operator.is_not, self, other) def in_(self, other): """Return the outcome of the test ``self in other``. Tests containment.""" return Call(operator.contains, other, self) def contains(self, other): """Return the outcome of the test ``other in self``. Tests containment.""" return Call(operator.contains, self, other) def unpack(self, size): """Unpack the variable into a tuple of variables.""" return _Unpack(size, self) def delete(self): """ Delete objects associated with this Variable from the stream. They can not be accessed afterwards. See Also: :py:class:`` """ DelVariable(self) def copy(self): """Return A new `Variable` object pointing to a copy of the value.""" return Call(copy.copy, self)
# Types RawOrVariable = Union[T, Variable[T]] NodeCallReturnType = Union[None, Variable, Tuple[Variable]] Stream = Iterator["StreamObject"] r"""A stream is an Iterator of :py:class:`StreamObject`\ s.""" class EmptyPipelineStackError(Exception): """Raised when a node is created outside of a Pipeline context."""
[docs]class Node(StreamTransformer): """Base class for all stream processing nodes.""" def __init__(self): super().__init__() # Bind outputs to self outputs = getattr(self.__class__, "outputs", []) self.outputs = [self.__bind_output(o) for o in outputs] # Register with pipeline try: pipeline_top = _pipeline_stack[-1] except IndexError: raise EmptyPipelineStackError( "Empty pipeline stack. {} has to be called in a pipeline context.".format( self.__class__.__name__ ) ) from None self.rank = pipeline_top.add_child(self) def __bind_output(self, port: "Output"): """Bind self to port and return a variable.""" variable = port._create_variable(self) # pylint: disable=protected-access return variable def __call__(self) -> NodeCallReturnType: """Return outputs.""" try: outputs = self.__dict__["outputs"] except KeyError: raise RuntimeError( "'{type}' is not initialized properly. Did you forget a super().__init__() in the constructor?".format( type=type(self).__name__ ) ) # Return outputs if not outputs: return None if len(outputs) == 1: # If one output, return exactly this return outputs[0] # Otherwise, return list of outputs return outputs
[docs] def prepare_input(self, obj, names): """Return a tuple corresponding to the input ports.""" if isinstance(names, str): return resolve_variable(obj, getattr(self, names)) return tuple( resolve_variable(obj, v) for v in (getattr(self, n) for n in names) )
[docs] def prepare_output(self, obj: "StreamObject", *values, n_remaining_hint=None): """Update obj using the values corresponding to the output ports.""" if n_remaining_hint is not None: obj.n_remaining_hint = n_remaining_hint if not self.outputs: if any(v is not None for v in values): raise ValueError( "No output port specified but transform returned a value." ) return obj while True: n_values = len(values) n_outputs = len(self.outputs) if n_values != n_outputs: # If values is a nested tuple, unnest and retry if n_values == 1 and isinstance(values[0], tuple): values = values[0] continue raise ValueError( "Length of values does not match number of output ports: {} vs. {}".format( n_values, n_outputs ) ) break for variable, r in zip(self.outputs, values): obj[variable] = r return obj
[docs] def after_stream(self): """ Do something after the stream was processed. Called by transform_stream after stream processing is done. *Override this in your own subclass.* """
def _get_parameter_names(self): """Inspect self.transform to get the parameter names.""" return [ for p in inspect.signature( self.transform # pylint: disable=no-member ).parameters.values() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD) ]
[docs] def transform_stream(self, stream: Stream) -> Stream: """ Transform a stream. By default, this calls ``self.transform`` with appropriate parameters. ``transform`` has to be implemented by a subclass if ``transform_stream`` is not overridden. Override if the stream has to be altered in some way, i.e. objects are created, deleted or re-arranged. """ names = self._get_parameter_names() with closing_if_closable(stream): for obj in stream: parameters = self.prepare_input(obj, names) result = self.transform(*parameters) # pylint: disable=no-member self.prepare_output(obj, result) yield obj self.after_stream()
def __str__(self): return "{}()".format(self.__class__.__name__)
[docs]class Output: """ Define an Output of a Node. Args: name (str): Name of the output. type (type, optional): Type of the output. doc (str, optional): Description of the output. """ def __init__( self, name: str, type: Optional[Type] = None, doc: Optional[str] = None ): = name self.type = type self.doc = doc self.node_cls = None def _create_variable(self, node: Node): """Return a _Variable with a reference to the node.""" return Variable(, node) def __repr__(self): return '{}("{}", {})'.format(self.__class__.__name__,, self.node_cls) def __call__(self, cls): """Add this output to the list of a nodes outputs.""" if not issubclass(cls, Node): raise ValueError( "This decorator is meant to be applied to a subclass of Node." ) try: outputs = cls.outputs except AttributeError: outputs = cls.outputs = [] outputs.insert(0, self) self.node_cls = cls return cls
[docs]def ReturnOutputs(node_cls): """Turn Node into a function returning Output variables.""" if not issubclass(node_cls, Node): raise ValueError("This decorator is meant to be applied to a subclass of Node.") @wraps(node_cls) def wrapper(*args, **kwargs) -> NodeCallReturnType: return node_cls(*args, **kwargs)() wrapper._node_cls = node_cls wrapper.__mro__ = node_cls.__mro__ return wrapper
[docs]@ReturnOutputs @Output("result") class Call(Node): """ Call a function with the supplied parameters. For every object in the stream, apply ``clbl`` to the corresponding stream variables. Args: clbl: A callable. *args: Positional arguments to ``clbl``. **kwargs: Keyword-arguments to ``clbl``. Returns: Variable: The result of the function invocation. Example: .. code-block:: python def foo(bar): return bar baz = ... # baz is a stream variable. result = Call(foo, baz) """ def __init__(self, clbl: Callable, *args, **kwargs): super().__init__() self.clbl = clbl self.args = args self.kwargs = kwargs
[docs] def transform(self, clbl, args, kwargs): """Apply clbl to the supplied arguments.""" return clbl(*args, **kwargs)
def __str__(self): try: name = self.clbl.__name__ except AttributeError: try: name = self.clbl.__class__.__name__ except AttributeError: name = "<unnamed>" args = [name] args.extend("..." if isinstance(a, Variable) else repr(a) for a in self.args) args.extend( "{}={}".format(k, "..." if isinstance(v, Variable) else v) for k, v in self.kwargs.items() ) return "{}({})".format(self.__class__.__name__, ", ".join(args))
class DelVariable(Node): """Delete a Variable from the stream.""" def __init__(self, variable): super().__init__() self.key = StreamObject._as_key(variable) # pylint: disable=protected-access def transform_stream(self, stream): with closing_if_closable(stream): for obj in stream: yield StreamObject({k: v for k, v in obj.items() if k != self.key}) class StreamObjectKeyError(KeyError): """Raised if a :py:class:`Variable` is not found in a :py:class:`StreamObject`.""" def __str__(self): return "{}\nYou probably removed this key from the stream.".format( super().__str__() )
[docs]class StreamObject(abc.MutableMapping): """ An object in the :py:obj:`Stream` that wraps all values. Attributes: data (dict): The data associated with this stream object. n_remaining_hint (int, optional): Approximate number of remaining objects in the stream including the current object. A value can be retrieved by indexing: ``obj[var]`` """ __slots__ = ["data", "n_remaining_hint"] def __init__( self, data: Optional[Dict] = None, n_remaining_hint: Optional[int] = None ): if data is None: data = {} = data self.n_remaining_hint = n_remaining_hint
[docs] def copy(self) -> "StreamObject": """Create a shallow copy.""" return StreamObject(, n_remaining_hint=self.n_remaining_hint)
@staticmethod def _as_key(obj): if isinstance(obj, Variable): return obj.hash return obj def __setitem__(self, key, value):[self._as_key(key)] = value def __delitem__(self, key): del[self._as_key(key)] def __getitem__(self, key): try: return[self._as_key(key)] except KeyError: raise StreamObjectKeyError(key) from None def __iter__(self): return iter( def __len__(self): return len(
[docs] def to_dict(self, **kwargs): """Turn the StreamObject into a regular dictionary.""" if not kwargs: raise ValueError("No names were supplied") return {k: self[v] for k, v in kwargs.items()}
def check_stream(stream: Optional[Stream]) -> Stream: """Ensure that `stream` is a valid stream.""" if stream is None: return iter([StreamObject(n_remaining_hint=1)]) return stream
[docs]class Pipeline(StreamTransformer): """ A Pipeline manages the execution of nodes. Nodes defined inside the pipeline context will be added to the pipeline. When the pipeline is executed, stream objects are passed from one node to the next in the same order. Args: parent (Pipeline, optional): A parent pipeline to attach to. If None and nested in an existing Pipeline, attach to this one. Example: .. code-block:: python with Pipeline() as pipeline: ... """ children: List[StreamTransformer] def __init__(self, parent: Optional["Pipeline"] = None): super().__init__() # Direct children of this pipeline self.children = [] if parent is None: try: parent = _pipeline_stack[-1] except IndexError: pass if parent is not None: parent.add_child(self) self.counter = parent.counter else: self.counter = itertools.count() def __enter__(self): # Push self to pipeline stack _pipeline_stack.append(self) return self def __exit__(self, *_): # Pop self from pipeline stack item = _pipeline_stack.pop() assert item is self
[docs] def transform_stream(self, stream: Optional[Stream] = None) -> Stream: """ Run the stream through all nodes and return it. Args: stream: A stream to transform. *This argument is solely to be used internally.* Returns: Stream: An iterable of stream objects. """ stream = check_stream(stream) # Here, the stream is not automatically closed, # as this would happen instantaneously. for child in self.children: # type: StreamTransformer stream = child.transform_stream(stream) return stream
[docs] def run(self): """ Run the complete pipeline. This is a convenience method to be used in place of: .. code-block:: python for _ in pipeline.transform_stream(): pass """ for _ in self.transform_stream(): pass
def add_child(self, child: StreamTransformer): self.children.append(child) return next(self.counter) def __str__(self): return "Pipeline([{}])".format(", ".join(str(n) for n in self.children))
[docs] def locals(self) -> Tuple[Variable]: """Variables created in the scope of this Pipeline, including child Pipelines.""" _locals: List[Variable] = [] for child in self.children: child: StreamTransformer if isinstance(child, Node): _locals.extend(child.outputs) if isinstance(child, Pipeline): _locals.extend(child.locals()) return tuple(_locals)
@ReturnOutputs class _Unpack(Node): def __init__(self, size, value): super().__init__() self.size = size self.value = value self.outputs = [Variable(str(i), self) for i in range(self.size)] def transform_stream(self, stream: Stream) -> Stream: with closing_if_closable(stream): for obj in stream: value = self.prepare_input(obj, "value") yield self.prepare_output(obj, *value)