Utilities

Utilities

class morphocut.utils.StreamEstimator[source]

Record how many objects are consumed and emitted and calculate the rate.

This should be used in StreamTransformers that alter the number of objects in the stream to update the estimate the number of remaining objects.

Example

est = StreamEstimator()

for obj in stream:
    # We're expecting 10 emitted objects for every consumed object:
    local_estimate = 10
    with est.consume(obj.n_remaining_hint, est_n_emit=local_estimate) as incoming:
        for _ in range(10):
            yield self.prepare_output(
                obj.copy(), value, n_remaining_hint=incoming.emit()
            )
consume(n_remaining_hint, *, est_n_emit=None, n_consumed=1)[source]

Context manager for an incoming object.

morphocut.utils.stream_groupby(stream, by=typing.Union[~T, morphocut.core.Variable[~T], typing.Tuple[typing.Union[~T, morphocut.core.Variable[~T]]]])[source]

Split a stream into sub-streams by key.

Every time the value of the by changes, a new sub-stream is generated. The sub-stream is itself an iterator that shares the underlying stream with stream_groupby.

Parameters
  • stream (Stream) – A MorphoCut stream.

  • by – (Variable or value or tuple thereof): The values to group by.

Yields

(key, sub_stream), where key is a value or a tuple and sub_stream is the corresponding sub-stream.