"""Manipulate MorphoCut streams and show diagnostic information."""
import itertools
import pprint
from queue import Queue
from threading import Thread
from typing import Callable, Collection, Optional, Union
from morphocut.utils import StreamEstimator
import tqdm
from deprecated.sphinx import deprecated
from morphocut.core import (
Node,
Output,
RawOrVariable,
ReturnOutputs,
Stream,
StreamObject,
Variable,
closing_if_closable,
)
__all__ = [
"Enumerate",
"Filter",
"FilterVariables",
"Pack",
"PrintObjects",
"Slice",
"StreamBuffer",
"Progress",
"Unpack",
]
[docs]@ReturnOutputs
class Progress(Node):
"""
Show a dynamically updating progress bar using `tqdm`_.
.. _tqdm: https://github.com/tqdm/tqdm
Args:
description (str): Description of the progress bar.
Example:
.. code-block:: python
with Pipeline() as pipeline:
Progress("Description")
Output: Description|███████████████████████| [00:00, 2434.24it/s]
"""
def __init__(
self, description: Optional[RawOrVariable[str]] = None, monitor_interval=None
):
super().__init__()
self.description = description
self.monitor_interval = monitor_interval
def transform_stream(self, stream: Stream):
with closing_if_closable(stream), tqdm.tqdm(
stream, unit_scale=True
) as progress:
if self.monitor_interval is not None:
progress.monitor_interval = self.monitor_interval
for n_processed, obj in enumerate(progress):
description = self.prepare_input(obj, "description")
if description:
progress.set_description(description)
if obj.n_remaining_hint is not None:
progress.total = n_processed + obj.n_remaining_hint
yield obj
@deprecated(reason="Deprecated in favor of Progress.", version="0.2.x")
def TQDM(*args, **kwargs):
return Progress(*args, **kwargs)
TQDM.__doc__ = Progress.__doc__
[docs]@ReturnOutputs
class Slice(Node):
"""
|stream| Slice the :py:obj:`~morphocut.core.Stream`.
Filter objects in the :py:obj:`~morphocut.core.Stream` based on their index.
Args:
start (int, optional): Skip this many objects upfront.
stop (int, optional): Stop at this index.
step (int, optional): Skip this many objects in every step.
"""
def __init__(self, *args: Optional[int]):
super().__init__()
self.args = args
def transform_stream(self, stream: Stream):
with closing_if_closable(stream):
for n_seen, obj in enumerate(itertools.islice(stream, *self.args)):
if obj.n_remaining_hint is not None:
# Slice into the total number of objects (n_seen + remaining)
n_total = len(
range(*slice(*self.args).indices(obj.n_remaining_hint + n_seen))
)
# Subtract n_seen to get remaining
obj.n_remaining_hint = max(
0,
n_total - n_seen,
)
yield obj
[docs]class StreamBuffer(Node):
"""
Buffer the stream.
Args:
maxsize (int): Maximum size of the buffer.
This allows continued processing while I/O bound Nodes wait for data.
"""
_sentinel = object()
def __init__(self, maxsize: int):
super().__init__()
self.queue = Queue(maxsize)
def _fill_queue(self, stream: Stream):
try:
with closing_if_closable(stream):
for obj in stream:
self.queue.put(obj)
finally:
self.queue.put(self._sentinel)
[docs]@ReturnOutputs
class PrintObjects(Node):
r"""
Print the contents of :py:class:`~morphocut.core.StreamObject`\ s.
For debugging purposes only.
Args:
*args (Variable): Variables to display.
"""
def __init__(self, *args: Variable):
super().__init__()
self.args = args
def transform_stream(self, stream: Stream):
with closing_if_closable(stream):
for obj in stream:
print("Stream object at 0x{:x}".format(id(obj)))
for outp in self.args:
print("{}: ".format(outp.name), end="")
pprint.pprint(obj[outp])
yield obj
[docs]@ReturnOutputs
@Output("index")
class Enumerate(Node):
"""
Enumerate objects in the :py:obj:`~morphocut.core.Stream`.
Args:
start (int, default 0): Start value of the counter.
Returns:
Variable[int]: Index (from start).
"""
def __init__(self, start: int = 0):
super().__init__()
self.start = start
def transform_stream(self, stream: Stream):
with closing_if_closable(stream):
for i, obj in enumerate(stream, start=self.start):
yield self.prepare_output(obj, i)
[docs]@ReturnOutputs
@Output("value")
class Unpack(Node):
"""
|stream| Unpack values from a collection into the :py:obj:`~morphocut.core.Stream`.
The result is basically the cross-product of the stream with the iterable.
Args:
collection (Collection or Variable): An iterable to unpack.
Returns:
Variable: One value from the iterable.
Example:
.. code-block:: python
with Pipeline() as p:
a = Unpack([1,2,3])
# The stream now consists of three objects:
# {a: 1}, {a: 2}, {a: 3}
b = Unpack([1,2,3])
# The stream now consists of nine objects:
# {a: 1, b: 1}, {a: 1, b: 2}, {a: 1, b: 3},
# {a: 2, b: 1}, {a: 2, b: 2}, {a: 2, b: 3},
# {a: 3, b: 1}, {a: 3, b: 2}, {a: 3, b: 3}
See Also:
:py:class:`~morphocut.stream.Pack`
"""
def __init__(self, collection: RawOrVariable[Collection]):
super().__init__()
self.collection = collection
def transform_stream(self, stream: Stream):
"""Transform a stream."""
with closing_if_closable(stream):
stream_estimator = StreamEstimator()
for obj in stream:
collection = tuple(self.prepare_input(obj, "collection"))
with stream_estimator.consume(
obj.n_remaining_hint, est_n_emit=len(collection)
) as incoming:
for value in collection:
yield self.prepare_output(
obj.copy(), value, n_remaining_hint=incoming.emit()
)
[docs]@ReturnOutputs
class Pack(Node):
"""
|stream| Pack values of subsequent objects in the stream into one tuple.
Args:
size (int or Variable): Number of objects to aggregate.
*variables (Variable): Variables to pack.
Returns:
One or more Variable: One output Variable per input Variable.
Example:
.. code-block:: python
with Pipeline() as p:
a = Unpack([1,2,3])
# The stream now consists of three objects:
# {a: 1}, {a: 2}, {a: 3}
a123 = Pack(3, a)
# The stream now consists one object:
# {a: 1, a123: (1,2,3)}
See Also:
:py:class:`~morphocut.stream.Unpack`
"""
def __init__(self, size, *variables):
super().__init__()
self.size = size
self.variables = variables
# Mess with self.outputs
self.outputs = [Variable(v.name, self) for v in self.variables]
def transform_stream(self, stream: Stream):
stream_estimator = StreamEstimator()
while True:
packed = list(itertools.islice(stream, self.size))
if not packed:
break
with stream_estimator.consume(
packed[0].n_remaining_hint, n_consumed=len(packed)
) as incoming:
packed_values = tuple(
tuple(o[v] for o in packed) for v in self.variables
)
yield self.prepare_output(
packed[0], *packed_values, n_remaining_hint=incoming.emit()
)
class _Predicate:
def __init__(self, variable: Variable):
self.variable = variable
def __call__(self, obj: StreamObject) -> bool:
return obj[self.variable]
[docs]class Filter(Node):
"""
|stream| Filter objects in the :py:obj:`~morphocut.core.Stream`.
After this node, the stream will only contain objects for
which `function` evaluates to `True`.
Args:
predicate (Variable or callable):
If the predicate is true, the object will stay in the stream.
If a callable, it will receive a
:py:class:`~morphocut.core.StreamObject` and must return a bool.
Example:
.. code-block:: python
with Pipeline() as p:
a = Unpack([1,2,3])
# The stream now consists of three objects:
# {a: 1}, {a: 2}, {a: 3}
# Keep only objects where a>2
Filter(a>2)
# The stream now consists of 1 object:
# {a: 3}
## OR:
# Keep only objects where a>2.
# Here, `obj` is the current StreamObject.
# This form might be preferred for performance reasons.
Filter(lambda obj: obj[a] > 2)
"""
def __init__(self, predicate: Union[Variable, Callable[[StreamObject], bool]]):
super().__init__()
if isinstance(predicate, Variable):
self.predicate = _Predicate(predicate)
else:
self.predicate = predicate
[docs]@ReturnOutputs
class FilterVariables(Node):
r"""
|stream| Only keep the specified Variables in the stream.
This might reduce memory usage and speed up processing, especially when
:py:class:`~morphocut.core.StreamObject`\ s have to be sent to other processes.
"""
def __init__(self, *variables):
super().__init__()
self.keys = {
StreamObject._as_key(v) # pylint: disable=protected-access
for v in variables
}
def transform_stream(self, stream: Stream):
with closing_if_closable(stream):
for obj in stream:
yield StreamObject({k: v for k, v in obj.items() if k in self.keys})
@ReturnOutputs
@Output("n_remaining_hint")
class RemainingHint(Node):
"""
Extract n_remaining_hint from an object.
"""
def transform_stream(self, stream: Stream):
with closing_if_closable(stream):
for obj in stream:
yield self.prepare_output(obj, obj.n_remaining_hint)