"""Core components of the MorphoCut processing graph."""
import copy
import inspect
import itertools
import operator
from abc import ABC, abstractmethod
from collections import abc
from contextlib import contextmanager
from functools import wraps
from typing import (
Any,
Callable,
Dict,
Generic,
Iterator,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
_pipeline_stack = [] # type: List[Pipeline] # pylint: disable=invalid-name
class StreamTransformer(ABC):
"""ABC for stream transformers like Pipeline and Node."""
def __init__(self):
self.id = "{:x}".format(id(self))
@abstractmethod
def transform_stream(self, stream: "Stream") -> "Stream":
while False:
yield
@classmethod
def __subclasshook__(cls, C):
if cls is StreamTransformer:
if any("transform_stream" in B.__dict__ for B in C.__mro__):
return True
return NotImplemented
@contextmanager
def closing_if_closable(stream):
try:
yield stream
finally:
try:
stream.close()
except:
pass
def resolve_variable(obj, variable_or_value):
if isinstance(variable_or_value, Variable):
return obj[variable_or_value]
if isinstance(variable_or_value, tuple):
return tuple(resolve_variable(obj, v) for v in variable_or_value)
if isinstance(variable_or_value, list):
return list(resolve_variable(obj, v) for v in variable_or_value)
if isinstance(variable_or_value, dict):
return {k: resolve_variable(obj, v) for k, v in variable_or_value.items()}
return variable_or_value
T = TypeVar("T")
[docs]class Variable(Generic[T]):
"""
A Variable identifies a value in a stream object.
Variables are (almost) never instanciated manually, they are created when calling a Node.
Attributes:
name: The name of the Variable.
parent: The parent that created the Variable.
This is just used in the string representation.
Operations:
Variables support the following operations.
Each operation is realized as a new Node in the Pipeline,
so use them sparingly.
Operator and method can be used interchangeably (if both present).
+-----------------------+-----------------------+---------------------------------+
| Operation | Operator | Method |
+=======================+=======================+=================================+
| Addition | ``a + b`` | ``a.add(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Containment Test | | ``a.contains(b)``, ``b.in_(a)`` |
+-----------------------+-----------------------+---------------------------------+
| True Division | ``a / b`` | ``a.truediv(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Integer Division | ``a // b`` | ``a.floordiv(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Bitwise And | ``a & b`` | ``a.and_(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Bitwise Exclusive Or | ``a ^ b`` | ``a.xor(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Bitwise Inversion | ``~ a`` | ``a.invert(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Bitwise Or | ``a | b`` | ``a.or_(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Exponentiation | ``a ** b`` | ``a.pow(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Identity | | ``a.is_(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Identity | | ``a.is_not(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Indexed Assignment | ``obj[k] = v`` | |
+-----------------------+-----------------------+---------------------------------+
| Indexed Deletion | ``del obj[k]`` | |
+-----------------------+-----------------------+---------------------------------+
| Indexing | ``obj[k]`` | |
+-----------------------+-----------------------+---------------------------------+
| Left Shift | ``a << b`` | ``a.lshift(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Modulo | ``a % b`` | ``a.mod(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Multiplication | ``a * b`` | ``a.mul(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Matrix Multiplication | ``a @ b`` | ``a.matmul(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Negation (Arithmetic) | ``- a`` | ``a.neg(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Negation (Logical) | | ``a.not_()`` |
+-----------------------+-----------------------+---------------------------------+
| Positive | ``+ a`` | ``a.pos(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Right Shift | ``a >> b`` | ``a.rshift(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Slice Assignment | ``seq[i:j] = values`` | |
+-----------------------+-----------------------+---------------------------------+
| Slice Deletion | ``del seq[i:j]`` | |
+-----------------------+-----------------------+---------------------------------+
| Slicing | ``seq[i:j]`` | |
+-----------------------+-----------------------+---------------------------------+
| Subtraction | ``a - b`` | ``a.sub(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Ordering | ``a < b`` | ``a.lt(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Ordering | ``a <= b`` | ``a.leq(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Equality | ``a == b`` | ``a.eq(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Difference | ``a != b`` | ``a.ne(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Ordering | ``a >= b`` | ``a.ge(b)`` |
+-----------------------+-----------------------+---------------------------------+
| Ordering | ``a > b`` | ``a.gt(b)`` |
+-----------------------+-----------------------+---------------------------------+
``a``, ``b``, ``i``, ``j`` and ``k`` can be either
:py:class:`Variable` instances or raw values.
"""
__slots__ = ["name", "parent", "hash"]
def __init__(self, name: str, parent: Any):
self.name = name
self.parent = parent
self.hash = hash((parent.id, name))
def __str__(self):
return "<Variable {}.{}>".format(self.parent, self.name)
def __repr__(self):
return self.__str__()
# Attribute access
def __getattr__(self, name):
return Call(getattr, self, name)
# Item access
def __getitem__(self, key):
return Call(operator.getitem, self, key)
def __setitem__(self, key, value):
return Call(operator.setitem, self, key, value)
def __delitem__(self, key):
return Call(operator.delitem, self, key)
# Rich comparison methods
def __lt__(self, other):
return Call(operator.lt, self, other)
def __le__(self, other):
return Call(operator.le, self, other)
def __eq__(self, other):
return Call(operator.eq, self, other)
def __ne__(self, other):
return Call(operator.ne, self, other)
def __gt__(self, other):
return Call(operator.gt, self, other)
def __ge__(self, other):
return Call(operator.ge, self, other)
# Binary arithmetic operations
def __add__(self, other):
return Call(operator.add, self, other)
def __sub__(self, other):
return Call(operator.sub, self, other)
def __mul__(self, other):
return Call(operator.mul, self, other)
def __matmul__(self, other):
return Call(operator.matmul, self, other)
def __truediv__(self, other):
return Call(operator.truediv, self, other)
def __floordiv__(self, other):
return Call(operator.floordiv, self, other)
def __mod__(self, other):
return Call(operator.mod, self, other)
def __pow__(self, other):
return Call(operator.pow, self, other)
def __lshift__(self, other):
return Call(operator.lshift, self, other)
def __rshift__(self, other):
return Call(operator.rshift, self, other)
def __and__(self, other):
return Call(operator.and_, self, other)
def __xor__(self, other):
return Call(operator.xor, self, other)
def __or__(self, other):
return Call(operator.or_, self, other)
# Binary arithmetic operations with reflected (swapped) operands
def __radd__(self, other):
return Call(operator.add, other, self)
def __rsub__(self, other):
return Call(operator.sub, other, self)
def __rmul__(self, other):
return Call(operator.mul, other, self)
def __rmatmul__(self, other):
return Call(operator.matmul, other, self)
def __rtruediv__(self, other):
return Call(operator.truediv, other, self)
def __rfloordiv__(self, other):
return Call(operator.floordiv, other, self)
def __rmod__(self, other):
return Call(operator.mod, other, self)
def __rpow__(self, other):
return Call(operator.pow, other, self)
def __rlshift__(self, other):
return Call(operator.lshift, other, self)
def __rrshift__(self, other):
return Call(operator.rshift, other, self)
def __rand__(self, other):
return Call(operator.and_, other, self)
def __rxor__(self, other):
return Call(operator.xor, other, self)
def __ror__(self, other):
return Call(operator.or_, other, self)
# Unary arithmetic operations
def __neg__(self):
return Call(operator.neg, self)
def __pos__(self):
return Call(operator.pos, self)
def __abs__(self):
return Call(operator.abs, self)
def __invert__(self):
return Call(operator.invert, self)
# Above operators without underscores
getattr = __getattr__
lt = __lt__
le = __le__
eq = __eq__
ne = __ne__
gt = __gt__
ge = __ge__
add = __add__
sub = __sub__
mul = __mul__
matmul = __matmul__
truediv = __truediv__
floordiv = __floordiv__
mod = __mod__
pow = __pow__
lshift = __lshift__
rshift = __rshift__
and_ = __and__
xor = __xor__
or_ = __or__
neg = __neg__
pos = __pos__
abs = __abs__
invert = __invert__
# Special operators
def not_(self):
"""Return the outcome of not obj."""
return Call(operator.not_, self)
def truth(self):
"""Return True if obj is true, and False otherwise."""
return Call(operator.truth, self)
def is_(self, other):
"""Return ``self is other``. Tests object identity."""
return Call(operator.is_, self, other)
def is_not(self, other):
"""Return ``self is not other``. Tests object identity."""
return Call(operator.is_not, self, other)
def in_(self, other):
"""Return the outcome of the test ``self in other``. Tests containment."""
return Call(operator.contains, other, self)
def contains(self, other):
"""Return the outcome of the test ``other in self``. Tests containment."""
return Call(operator.contains, self, other)
def unpack(self, size):
"""Unpack the variable into a tuple of variables."""
return _Unpack(size, self)
def delete(self):
"""
Delete objects associated with this Variable from the stream.
They can not be accessed afterwards.
See Also: :py:class:`morphocut.stream.FilterVariables`
"""
DelVariable(self)
def copy(self):
"""Return A new `Variable` object pointing to a copy of the value."""
return Call(copy.copy, self)
# Types
RawOrVariable = Union[T, Variable[T]]
NodeCallReturnType = Union[None, Variable, Tuple[Variable]]
Stream = Iterator["StreamObject"]
r"""A stream is an Iterator of :py:class:`StreamObject`\ s."""
class EmptyPipelineStackError(Exception):
"""Raised when a node is created outside of a Pipeline context."""
[docs]class Node(StreamTransformer):
"""Base class for all stream processing nodes."""
def __init__(self):
super().__init__()
# Bind outputs to self
outputs = getattr(self.__class__, "outputs", [])
self.outputs = [self.__bind_output(o) for o in outputs]
# Register with pipeline
try:
pipeline_top = _pipeline_stack[-1]
except IndexError:
raise EmptyPipelineStackError(
"Empty pipeline stack. {} has to be called in a pipeline context.".format(
self.__class__.__name__
)
) from None
self.rank = pipeline_top.add_child(self)
def __bind_output(self, port: "Output"):
"""Bind self to port and return a variable."""
variable = port._create_variable(self) # pylint: disable=protected-access
return variable
def __call__(self) -> NodeCallReturnType:
"""Return outputs."""
try:
outputs = self.__dict__["outputs"]
except KeyError:
raise RuntimeError(
"'{type}' is not initialized properly. Did you forget a super().__init__() in the constructor?".format(
type=type(self).__name__
)
)
# Return outputs
if not outputs:
return None
if len(outputs) == 1:
# If one output, return exactly this
return outputs[0]
# Otherwise, return list of outputs
return outputs
[docs] def prepare_output(self, obj: "StreamObject", *values, n_remaining_hint=None):
"""Update obj using the values corresponding to the output ports."""
if n_remaining_hint is not None:
obj.n_remaining_hint = n_remaining_hint
if not self.outputs:
if any(v is not None for v in values):
raise ValueError(
"No output port specified but transform returned a value."
)
return obj
while True:
n_values = len(values)
n_outputs = len(self.outputs)
if n_values != n_outputs:
# If values is a nested tuple, unnest and retry
if n_values == 1 and isinstance(values[0], tuple):
values = values[0]
continue
raise ValueError(
"Length of values does not match number of output ports: {} vs. {}".format(
n_values, n_outputs
)
)
break
for variable, r in zip(self.outputs, values):
obj[variable] = r
return obj
[docs] def after_stream(self):
"""
Do something after the stream was processed.
Called by transform_stream after stream processing is done.
*Override this in your own subclass.*
"""
def _get_parameter_names(self):
"""Inspect self.transform to get the parameter names."""
return [
p.name
for p in inspect.signature(
self.transform # pylint: disable=no-member
).parameters.values()
if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
]
def __str__(self):
return "{}()".format(self.__class__.__name__)
[docs]class Output:
"""
Define an Output of a Node.
Args:
name (str): Name of the output.
type (type, optional): Type of the output.
doc (str, optional): Description of the output.
"""
def __init__(
self, name: str, type: Optional[Type] = None, doc: Optional[str] = None
):
self.name = name
self.type = type
self.doc = doc
self.node_cls = None
def _create_variable(self, node: Node):
"""Return a _Variable with a reference to the node."""
return Variable(self.name, node)
def __repr__(self):
return '{}("{}", {})'.format(self.__class__.__name__, self.name, self.node_cls)
def __call__(self, cls):
"""Add this output to the list of a nodes outputs."""
if not issubclass(cls, Node):
raise ValueError(
"This decorator is meant to be applied to a subclass of Node."
)
try:
outputs = cls.outputs
except AttributeError:
outputs = cls.outputs = []
outputs.insert(0, self)
self.node_cls = cls
return cls
[docs]def ReturnOutputs(node_cls):
"""Turn Node into a function returning Output variables."""
if not issubclass(node_cls, Node):
raise ValueError("This decorator is meant to be applied to a subclass of Node.")
@wraps(node_cls)
def wrapper(*args, **kwargs) -> NodeCallReturnType:
return node_cls(*args, **kwargs)()
wrapper._node_cls = node_cls
wrapper.__mro__ = node_cls.__mro__
return wrapper
[docs]@ReturnOutputs
@Output("result")
class Call(Node):
"""
Call a function with the supplied parameters.
For every object in the stream, apply ``clbl`` to the corresponding stream variables.
Args:
clbl: A callable.
*args: Positional arguments to ``clbl``.
**kwargs: Keyword-arguments to ``clbl``.
Returns:
Variable: The result of the function invocation.
Example:
.. code-block:: python
def foo(bar):
return bar
baz = ... # baz is a stream variable.
result = Call(foo, baz)
"""
def __init__(self, clbl: Callable, *args, **kwargs):
super().__init__()
self.clbl = clbl
self.args = args
self.kwargs = kwargs
def __str__(self):
try:
name = self.clbl.__name__
except AttributeError:
try:
name = self.clbl.__class__.__name__
except AttributeError:
name = "<unnamed>"
args = [name]
args.extend("..." if isinstance(a, Variable) else repr(a) for a in self.args)
args.extend(
"{}={}".format(k, "..." if isinstance(v, Variable) else v)
for k, v in self.kwargs.items()
)
return "{}({})".format(self.__class__.__name__, ", ".join(args))
class DelVariable(Node):
"""Delete a Variable from the stream."""
def __init__(self, variable):
super().__init__()
self.key = StreamObject._as_key(variable) # pylint: disable=protected-access
def transform_stream(self, stream):
with closing_if_closable(stream):
for obj in stream:
yield StreamObject({k: v for k, v in obj.items() if k != self.key})
class StreamObjectKeyError(KeyError):
"""Raised if a :py:class:`Variable` is not found in a :py:class:`StreamObject`."""
def __str__(self):
return "{}\nYou probably removed this key from the stream.".format(
super().__str__()
)
[docs]class StreamObject(abc.MutableMapping):
"""
An object in the :py:obj:`Stream` that wraps all values.
Attributes:
data (dict): The data associated with this stream object.
n_remaining_hint (int, optional): Approximate number of remaining objects in the stream including the current object.
A value can be retrieved by indexing: ``obj[var]``
"""
__slots__ = ["data", "n_remaining_hint"]
def __init__(
self, data: Optional[Dict] = None, n_remaining_hint: Optional[int] = None
):
if data is None:
data = {}
self.data = data
self.n_remaining_hint = n_remaining_hint
[docs] def copy(self) -> "StreamObject":
"""Create a shallow copy."""
return StreamObject(self.data.copy(), n_remaining_hint=self.n_remaining_hint)
@staticmethod
def _as_key(obj):
if isinstance(obj, Variable):
return obj.hash
return obj
def __setitem__(self, key, value):
self.data[self._as_key(key)] = value
def __delitem__(self, key):
del self.data[self._as_key(key)]
def __getitem__(self, key):
try:
return self.data[self._as_key(key)]
except KeyError:
raise StreamObjectKeyError(key) from None
def __iter__(self):
return iter(self.data)
def __len__(self):
return len(self.data)
[docs] def to_dict(self, **kwargs):
"""Turn the StreamObject into a regular dictionary."""
if not kwargs:
raise ValueError("No names were supplied")
return {k: self[v] for k, v in kwargs.items()}
def check_stream(stream: Optional[Stream]) -> Stream:
"""Ensure that `stream` is a valid stream."""
if stream is None:
return iter([StreamObject(n_remaining_hint=1)])
return stream
[docs]class Pipeline(StreamTransformer):
"""
A Pipeline manages the execution of nodes.
Nodes defined inside the pipeline context will be added to the pipeline.
When the pipeline is executed, stream objects are passed
from one node to the next in the same order.
Args:
parent (Pipeline, optional): A parent pipeline to attach to.
If None and nested in an existing Pipeline, attach to this one.
Example:
.. code-block:: python
with Pipeline() as pipeline:
...
pipeline.run()
"""
children: List[StreamTransformer]
def __init__(self, parent: Optional["Pipeline"] = None):
super().__init__()
# Direct children of this pipeline
self.children = []
if parent is None:
try:
parent = _pipeline_stack[-1]
except IndexError:
pass
if parent is not None:
parent.add_child(self)
self.counter = parent.counter
else:
self.counter = itertools.count()
def __enter__(self):
# Push self to pipeline stack
_pipeline_stack.append(self)
return self
def __exit__(self, *_):
# Pop self from pipeline stack
item = _pipeline_stack.pop()
assert item is self
[docs] def run(self):
"""
Run the complete pipeline.
This is a convenience method to be used in place of:
.. code-block:: python
for _ in pipeline.transform_stream():
pass
"""
for _ in self.transform_stream():
pass
def add_child(self, child: StreamTransformer):
self.children.append(child)
return next(self.counter)
def __str__(self):
return "Pipeline([{}])".format(", ".join(str(n) for n in self.children))
[docs] def locals(self) -> Tuple[Variable]:
"""Variables created in the scope of this Pipeline, including child Pipelines."""
_locals: List[Variable] = []
for child in self.children:
child: StreamTransformer
if isinstance(child, Node):
_locals.extend(child.outputs)
if isinstance(child, Pipeline):
_locals.extend(child.locals())
return tuple(_locals)
@ReturnOutputs
class _Unpack(Node):
def __init__(self, size, value):
super().__init__()
self.size = size
self.value = value
self.outputs = [Variable(str(i), self) for i in range(self.size)]
def transform_stream(self, stream: Stream) -> Stream:
with closing_if_closable(stream):
for obj in stream:
value = self.prepare_input(obj, "value")
yield self.prepare_output(obj, *value)