Utilities
Utilities
- class morphocut.utils.StreamEstimator[source]
Record how many objects are consumed and emitted and calculate the rate.
This should be used in StreamTransformers that alter the number of objects in the stream to update the estimate the number of remaining objects.
Example
est = StreamEstimator() for obj in stream: # We're expecting 10 emitted objects for every consumed object: local_estimate = 10 with est.consume(obj.n_remaining_hint, est_n_emit=local_estimate) as incoming: for _ in range(10): yield self.prepare_output( obj.copy(), value, n_remaining_hint=incoming.emit() )
- morphocut.utils.buffered_generator(buf_size)[source]
A decorator that adds buffering to a generator, prefetching items in a background thread.
The buffered_generator decorator allows a generator to run asynchronously in a separate thread, prefetching items into a buffer of specified size. This can improve performance when consuming the generator, particularly in scenarios where the generator’s production speed is inconsistent or involves I/O operations.
- Parameters:
buf_size (int) – The size of the buffer to use for prefetching items. If buf_size is 0, no buffering is applied, and the generator runs synchronously.
- Returns:
Callable – A wrapped generator function that yields items from a buffer filled by a background thread.
Example
@buffered_generator(buf_size=10) def slow_generator():
- for i in range(20):
time.sleep(0.5) yield i
- for item in slow_generator():
# Process item …
Notes
If buf_size is set to 0, the generator runs synchronously with no background threading.
Exceptions raised in the original generator are propagated to the main thread when the sentinel value is reached.
- morphocut.utils.stream_groupby(stream, by=typing.Union[~T, morphocut.core.Variable[~T], typing.Tuple[typing.Union[~T, morphocut.core.Variable[~T]]]])[source]
Split a stream into sub-streams by key.
Every time the value of the by changes, a new sub-stream is generated. The sub-stream is itself an iterator that shares the underlying stream with stream_groupby.
- Parameters:
stream (Stream) – A MorphoCut stream.
by – (Variable or value or tuple thereof): The values to group by.
- Yields:
(key, sub_stream), where key is a value or a tuple and sub_stream is the corresponding sub-stream.