Source code for morphocut.profile

from typing import Optional

from morphocut.core import Pipeline, Stream, StreamObject, Variable
from time import perf_counter

SI_PREFIXES = ["", "m", "μ", "n"]


[docs]class Profile(Pipeline): """ Profile pipeline nodes. The average lead time of objects in this pipeline context is measured and printed after the end of processing. Args: name (str): The name of this profiling operation. Example: .. code-block:: python with Pipeline() as pipeline: # Un-profiled processing ... with Profile("some name"): # Profiled processing ... pipeline.run() """ def __init__(self, name): super().__init__() self.name = name self._start = Variable("_sentinel", self) self._average = 0 self._n = 0 def _insert_sentinel(self, stream: Stream) -> Stream: for obj in stream: obj[self._start] = perf_counter() yield obj def _format_average(self): value = self._average for prefix in SI_PREFIXES: if value > 1: break value *= 1000 return f"{value:.2f}{prefix}s" def transform_stream(self, stream: Optional[Stream] = None) -> Stream: if stream is None: stream = [StreamObject()] stream = self._insert_sentinel(stream) # Here, the stream is not automatically closed, # as this would happen instantaneously. for child in self.children: # type: StreamTransformer stream = child.transform_stream(stream) for obj in stream: diff = perf_counter() - obj.pop(self._start) self._n += 1 self._average = self._average + (diff - self._average) / self._n yield obj print( f"Profile {self.name}: {self._n:,d} objects, avg. {self._format_average()}" )