Skip to content

sgnts.base.base

TSResourceSource dataclass

Bases: ParallelizeBase, TSSource


              flowchart TD
              sgnts.base.base.TSResourceSource[TSResourceSource]
              sgnts.base.base.TSSource[TSSource]
              sgnts.base.base._TSSource[_TSSource]

                              sgnts.base.base.TSSource --> sgnts.base.base.TSResourceSource
                                sgnts.base.base._TSSource --> sgnts.base.base.TSSource
                



              click sgnts.base.base.TSResourceSource href "" "sgnts.base.base.TSResourceSource"
              click sgnts.base.base.TSSource href "" "sgnts.base.base.TSSource"
              click sgnts.base.base._TSSource href "" "sgnts.base.base._TSSource"
            

Source class that is entirely data driven by an external resource.

This class uses ParallelizeBase to run data generation in a separate worker thread. Subclasses must override the worker_process method to define how data is generated in the worker.

The worker communicates with the main thread via queues provided by ParallelizeBase. Data should be sent as (pad, buffer) tuples to the output queue using context.output_queue.put((pad, buf)).

Important: Since the worker starts when entering the Parallelize context (before setup() is called), all parameters needed by the worker must be added as instance attributes and will be automatically passed to worker_process via the parameter extraction mechanism.

Subclasses should: 1. Override worker_process to implement data generation 2. Use context.output_queue.put((pad, buf)) to send data 3. Check context.should_stop() to know when to exit

Exception handling follows SGN's improved Parallelize pattern: exceptions in the worker are caught by the framework, printed to stdout, and cause the worker to terminate. The main thread detects abnormal termination via the internal() method.

Parameters:

Name Type Description Default
start float | None

Optional[float] = None Start time in GPS seconds. Used by subclasses to determine when data generation should begin.

None
end float | None

Optional[float] = None End time in GPS seconds. Alternative to duration parameter. Cannot be given if duration is given.

None
duration float | None

Optional[float] = None Duration in seconds. Alternative to end parameter. If neither end nor duration is specified, defaults to maximum int64 value.

None
in_queue_timeout int

int = 60 Timeout in seconds when waiting for data from the worker. Used by get_data_from_queue() in the main thread.

60
Source code in src/sgnts/base/base.py
@dataclass
class TSResourceSource(ParallelizeBase, TSSource):
    """Source class that is entirely data driven by an external resource.

    This class uses ParallelizeBase to run data generation in a separate
    worker thread. Subclasses must override the worker_process method
    to define how data is generated in the worker.

    The worker communicates with the main thread via queues provided by
    ParallelizeBase. Data should be sent as (pad, buffer) tuples to
    the output queue using context.output_queue.put((pad, buf)).

    Important: Since the worker starts when entering the Parallelize context
    (before setup() is called), all parameters needed by the worker must be
    added as instance attributes and will be automatically passed to worker_process
    via the parameter extraction mechanism.

    Subclasses should:
    1. Override worker_process to implement data generation
    2. Use context.output_queue.put((pad, buf)) to send data
    3. Check context.should_stop() to know when to exit

    Exception handling follows SGN's improved Parallelize pattern: exceptions in the
    worker are caught by the framework, printed to stdout, and cause the
    worker to terminate. The main thread detects abnormal termination via
    the internal() method.

    Args:
        start: Optional[float] = None
            Start time in GPS seconds. Used by subclasses to determine
            when data generation should begin.
        end: Optional[float] = None
            End time in GPS seconds. Alternative to duration parameter.
            Cannot be given if duration is given.
        duration: Optional[float] = None
            Duration in seconds. Alternative to end parameter.
            If neither end nor duration is specified, defaults to maximum int64 value.
        in_queue_timeout: int = 60
            Timeout in seconds when waiting for data from the worker.
            Used by get_data_from_queue() in the main thread.

    """

    in_queue_timeout: int = 60
    _use_threading_override: bool = (
        True  # Always use threading for I/O bound data sources
    )
    at_eos: bool = False

    def __post_init__(self):
        self.queue_maxsize = 100

        self._is_setup = False

        # Initialize parent classes - IMPORTANT: Order matters!
        # TSSource validates end/duration and creates self.source_pads and self.srcs
        # ParallelizeBase extracts self.srcs for worker
        # super().__post_init__()
        TSSource.__post_init__(self)
        ParallelizeBase.__post_init__(self)

    @property
    def is_setup(self):
        return self._is_setup

    def sample_shape(self, pad):
        """The channels per sample that a buffer should produce as a tuple
        (since it can be a tensor). For single channels just return ()"""
        props = self.first_buffer_properties[pad]
        assert props is not None
        return props["sample_shape"]

    def sample_rate(self, pad):
        """The integer sample rate that a buffer should carry"""
        props = self.first_buffer_properties[pad]
        assert props is not None
        return props["sample_rate"]

    @property
    def latest_offset(self):
        """Since the worker is responsible for producing a queue of
        buffers, the latest offset can be derived from those"""
        latest = numpy.iinfo(numpy.int64).min
        for properties in self.latest_buffer_properties.values():
            if properties is not None:
                latest = max(latest, properties["end_offset"])
        return latest

    @property
    def start_offset(self):
        offsets = [
            b["offset"] for b in self.first_buffer_properties.values() if b is not None
        ]
        return min(offsets)

    @property
    def end_offset(self):
        if self.end is None:
            return float("inf")
        return Offset.fromsec(self.end - Offset.offset_ref_start / Time.SECONDS)

    def setup(self) -> None:
        """Initialize the TSResourceSource data structures."""
        if not self._is_setup:
            self.buffer_queue: dict[SourcePad, deque[SeriesBuffer]] = {
                p: deque() for p in self.rsrcs
            }
            self.latest_buffer_properties = {p: None for p in self.rsrcs}
            self.first_buffer_properties = {p: None for p in self.rsrcs}
            self._is_setup = True

    @property
    def queued_duration(self):
        durations = [d[-1].end - d[0].start for d in self.buffer_queue.values() if d]
        if durations:
            return max(durations)
        else:
            return 0.0

    def _get_data_from_worker(
        self, timeout: int = 60
    ) -> dict[SourcePad, list[SeriesBuffer]]:
        """Get data from the worker via ParallelizeSourceElement's queue."""
        data_by_pad: dict[SourcePad, list[SeriesBuffer]] = {p: [] for p in self.rsrcs}
        start_time = stime.time()

        # Collect data from worker until we have data for all pads or timeout
        while stime.time() - start_time < timeout:
            # Check if worker has terminated abnormally before trying to get data
            self.check_worker_terminated()
            try:
                # Get data from worker queue (provided by ParallelizeSourceElement)
                item = self.out_queue.get(timeout=0.1)

                pad, buf = item
                data_by_pad[pad].append(buf)

                # Check if we have at least one buffer for each pad
                if all(data_by_pad[p] for p in self.rsrcs):
                    break

            except queue.Empty:
                # No data available yet, continue waiting
                continue
        else:
            self.check_worker_terminated()
            # Timeout reached
            raise ValueError(f"Could not read from resource after {timeout} seconds")

        return data_by_pad

    def get_data_from_queue(self):
        """Retrieve data from the worker with a timeout."""
        # Get data from worker
        data_by_pad = self._get_data_from_worker(timeout=self.in_queue_timeout)

        # Add data to output queues
        for pad, buffers in data_by_pad.items():
            self.buffer_queue[pad].extend(buffers)

            if buffers:  # If we got any buffers for this pad
                buffer_queue = self.buffer_queue[pad]
                self.latest_buffer_properties[pad] = buffer_queue[-1].properties
                if self.first_buffer_properties[pad] is None:
                    self.first_buffer_properties[pad] = buffer_queue[0].properties

        # Update start from actual data if it wasn't explicitly set
        if self.start is None:
            self.start = Offset.tosec(self.start_offset)

        # If end wasn't explicitly set, calculate it from actual start + duration
        if self.end is None and self.duration is not None:
            self.end = self.start + self.duration

    def set_data(self, out_frame, pad):
        """This method will set data on out_frame based on the contents of the
        internal queue"""

        # Check if we are at EOS, if so, set the flag
        if out_frame.EOS:
            self.at_eos = True

        # If we have been given a zero length frame, just return it. That means
        # we didn't have data at the time the frame was prepared and we should
        # just go with it.
        if out_frame.offset == out_frame.end_offset:
            return out_frame

        # Otherwise create a TSFrame from all the buffers that we have queued up
        in_frame = TSFrame(buffers=list(self.buffer_queue[pad]))

        # make sure nothing is fishy
        assert out_frame.end_offset <= in_frame.end_offset, (
            f"Output frame end_offset {out_frame.end_offset} extends beyond "
            f"input frame end_offset {in_frame.end_offset}"
        )

        # intersect the TSSource provided output frame with the in_frame
        before, intersection, after = out_frame.intersect(in_frame)

        # Clear the queue
        self.buffer_queue[pad].clear()

        # and repopulate it with only stuff that is newer than what we just sent.
        if after is not None:
            self.buffer_queue[pad].extend(after.buffers)

        # It is possible that the out_frame is before the data we have in the
        # queue, if so the intersection will be None. Thats okay, we can just
        # pass along that gap buffer.
        if intersection is None:
            return out_frame

        # make sure to update EOS
        intersection.EOS = out_frame.EOS
        return intersection

    def __set_pad_buffer_params(
        self,
        pad: SourcePad,
    ) -> None:
        # Make sure this has only been called once per pad
        assert (
            pad not in self._new_buffer_dict
        ), f"Pad {pad.name} already exists in _new_buffer_dict - duplicate pad entry"

        self._new_buffer_dict[pad] = {
            "sample_rate": self.sample_rate(pad),
            "shape": self.sample_shape(pad)
            + (self.num_samples(self.sample_rate(pad)),),
        }
        self._next_frame_dict[pad] = TSFrame.from_buffer_kwargs(
            offset=self.start_offset, data=None, **self._new_buffer_dict[pad]
        )

    def worker_process(self, context: WorkerContext, *args: Any, **kwargs: Any) -> None:
        """Override this method in subclasses to implement data generation.

        This method runs in a separate worker (process or thread) and should:
        1. Generate data from the external resource
        2. Send (pad, buffer) tuples via context.output_queue.put((pad, buf))
        3. Check context.should_stop() to know when to exit

        Args:
            context: WorkerContext with access to queues and events
            *args: Automatically extracted instance attributes
            **kwargs: Automatically extracted instance attributes with defaults
        """
        raise NotImplementedError("Subclasses must implement worker_process method")

    def internal(self):
        """Since internal() is guaranteed to be called prior to producing any
        data on a source pad, all setup is done here. First the resource itself is
        setup and the first data is pulled from the resource. Subsequent calls to
        internal only gets data from the resource if there is not enough data queued up
        to produce a result"""

        # Check if worker has terminated abnormally
        super().internal()

        # First setup the resource and pull the first data
        if not self.is_setup:
            self.setup()
            self.get_data_from_queue()
            # setup pads if they are not setup.
            # This must happen after the first get data
            for pad in self.rsrcs:
                if pad not in self._new_buffer_dict:
                    self.__set_pad_buffer_params(pad)

        # Accumulate data until we have enough for a full stride.
        # This prevents heartbeat frames when the worker produces data
        # in blocks smaller than the stride (e.g. 1/16 sec blocks for
        # a 1 sec stride).
        while self.latest_offset < self.current_end_offset:
            if self.latest_offset >= self.end_offset:
                break
            self.get_data_from_queue()

    def new(self, pad):
        frame = self.prepare_frame(pad, latest_offset=self.latest_offset)
        frame = self.set_data(frame, pad)
        return frame

latest_offset property

Since the worker is responsible for producing a queue of buffers, the latest offset can be derived from those

get_data_from_queue()

Retrieve data from the worker with a timeout.

Source code in src/sgnts/base/base.py
def get_data_from_queue(self):
    """Retrieve data from the worker with a timeout."""
    # Get data from worker
    data_by_pad = self._get_data_from_worker(timeout=self.in_queue_timeout)

    # Add data to output queues
    for pad, buffers in data_by_pad.items():
        self.buffer_queue[pad].extend(buffers)

        if buffers:  # If we got any buffers for this pad
            buffer_queue = self.buffer_queue[pad]
            self.latest_buffer_properties[pad] = buffer_queue[-1].properties
            if self.first_buffer_properties[pad] is None:
                self.first_buffer_properties[pad] = buffer_queue[0].properties

    # Update start from actual data if it wasn't explicitly set
    if self.start is None:
        self.start = Offset.tosec(self.start_offset)

    # If end wasn't explicitly set, calculate it from actual start + duration
    if self.end is None and self.duration is not None:
        self.end = self.start + self.duration

internal()

Since internal() is guaranteed to be called prior to producing any data on a source pad, all setup is done here. First the resource itself is setup and the first data is pulled from the resource. Subsequent calls to internal only gets data from the resource if there is not enough data queued up to produce a result

Source code in src/sgnts/base/base.py
def internal(self):
    """Since internal() is guaranteed to be called prior to producing any
    data on a source pad, all setup is done here. First the resource itself is
    setup and the first data is pulled from the resource. Subsequent calls to
    internal only gets data from the resource if there is not enough data queued up
    to produce a result"""

    # Check if worker has terminated abnormally
    super().internal()

    # First setup the resource and pull the first data
    if not self.is_setup:
        self.setup()
        self.get_data_from_queue()
        # setup pads if they are not setup.
        # This must happen after the first get data
        for pad in self.rsrcs:
            if pad not in self._new_buffer_dict:
                self.__set_pad_buffer_params(pad)

    # Accumulate data until we have enough for a full stride.
    # This prevents heartbeat frames when the worker produces data
    # in blocks smaller than the stride (e.g. 1/16 sec blocks for
    # a 1 sec stride).
    while self.latest_offset < self.current_end_offset:
        if self.latest_offset >= self.end_offset:
            break
        self.get_data_from_queue()

sample_rate(pad)

The integer sample rate that a buffer should carry

Source code in src/sgnts/base/base.py
def sample_rate(self, pad):
    """The integer sample rate that a buffer should carry"""
    props = self.first_buffer_properties[pad]
    assert props is not None
    return props["sample_rate"]

sample_shape(pad)

The channels per sample that a buffer should produce as a tuple (since it can be a tensor). For single channels just return ()

Source code in src/sgnts/base/base.py
def sample_shape(self, pad):
    """The channels per sample that a buffer should produce as a tuple
    (since it can be a tensor). For single channels just return ()"""
    props = self.first_buffer_properties[pad]
    assert props is not None
    return props["sample_shape"]

set_data(out_frame, pad)

This method will set data on out_frame based on the contents of the internal queue

Source code in src/sgnts/base/base.py
def set_data(self, out_frame, pad):
    """This method will set data on out_frame based on the contents of the
    internal queue"""

    # Check if we are at EOS, if so, set the flag
    if out_frame.EOS:
        self.at_eos = True

    # If we have been given a zero length frame, just return it. That means
    # we didn't have data at the time the frame was prepared and we should
    # just go with it.
    if out_frame.offset == out_frame.end_offset:
        return out_frame

    # Otherwise create a TSFrame from all the buffers that we have queued up
    in_frame = TSFrame(buffers=list(self.buffer_queue[pad]))

    # make sure nothing is fishy
    assert out_frame.end_offset <= in_frame.end_offset, (
        f"Output frame end_offset {out_frame.end_offset} extends beyond "
        f"input frame end_offset {in_frame.end_offset}"
    )

    # intersect the TSSource provided output frame with the in_frame
    before, intersection, after = out_frame.intersect(in_frame)

    # Clear the queue
    self.buffer_queue[pad].clear()

    # and repopulate it with only stuff that is newer than what we just sent.
    if after is not None:
        self.buffer_queue[pad].extend(after.buffers)

    # It is possible that the out_frame is before the data we have in the
    # queue, if so the intersection will be None. Thats okay, we can just
    # pass along that gap buffer.
    if intersection is None:
        return out_frame

    # make sure to update EOS
    intersection.EOS = out_frame.EOS
    return intersection

setup()

Initialize the TSResourceSource data structures.

Source code in src/sgnts/base/base.py
def setup(self) -> None:
    """Initialize the TSResourceSource data structures."""
    if not self._is_setup:
        self.buffer_queue: dict[SourcePad, deque[SeriesBuffer]] = {
            p: deque() for p in self.rsrcs
        }
        self.latest_buffer_properties = {p: None for p in self.rsrcs}
        self.first_buffer_properties = {p: None for p in self.rsrcs}
        self._is_setup = True

worker_process(context, *args, **kwargs)

Override this method in subclasses to implement data generation.

This method runs in a separate worker (process or thread) and should: 1. Generate data from the external resource 2. Send (pad, buffer) tuples via context.output_queue.put((pad, buf)) 3. Check context.should_stop() to know when to exit

Parameters:

Name Type Description Default
context WorkerContext

WorkerContext with access to queues and events

required
*args Any

Automatically extracted instance attributes

()
**kwargs Any

Automatically extracted instance attributes with defaults

{}
Source code in src/sgnts/base/base.py
def worker_process(self, context: WorkerContext, *args: Any, **kwargs: Any) -> None:
    """Override this method in subclasses to implement data generation.

    This method runs in a separate worker (process or thread) and should:
    1. Generate data from the external resource
    2. Send (pad, buffer) tuples via context.output_queue.put((pad, buf))
    3. Check context.should_stop() to know when to exit

    Args:
        context: WorkerContext with access to queues and events
        *args: Automatically extracted instance attributes
        **kwargs: Automatically extracted instance attributes with defaults
    """
    raise NotImplementedError("Subclasses must implement worker_process method")

TSSink dataclass

Bases: TimeSeriesMixin[TSFrame], SinkElement[TimeSpanFrame]


              flowchart TD
              sgnts.base.base.TSSink[TSSink]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSSink
                


              click sgnts.base.base.TSSink href "" "sgnts.base.base.TSSink"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A time-series sink element.

Source code in src/sgnts/base/base.py
@dataclass
class TSSink(TimeSeriesMixin[TSFrame], SinkElement[TimeSpanFrame]):
    """A time-series sink element."""

    def internal(self) -> None:
        """Process frames by calling child class implementation.

        If the child class defines a process() method, it will be called with
        input frame dictionaries. Otherwise, child classes should override
        internal() directly.
        """
        super().internal()

        # Check if the element defines a process() method
        if hasattr(self, "process"):
            # Collect all input frames (both TSFrame and EventFrame)
            inframes: dict[SinkPad, TimeSpanFrame] = {}
            inframes.update(self.next_ts_inputs())
            inframes.update(self.next_event_inputs())

            # Call the process method
            self.process(inframes)  # type: ignore[attr-defined]

internal()

Process frames by calling child class implementation.

If the child class defines a process() method, it will be called with input frame dictionaries. Otherwise, child classes should override internal() directly.

Source code in src/sgnts/base/base.py
def internal(self) -> None:
    """Process frames by calling child class implementation.

    If the child class defines a process() method, it will be called with
    input frame dictionaries. Otherwise, child classes should override
    internal() directly.
    """
    super().internal()

    # Check if the element defines a process() method
    if hasattr(self, "process"):
        # Collect all input frames (both TSFrame and EventFrame)
        inframes: dict[SinkPad, TimeSpanFrame] = {}
        inframes.update(self.next_ts_inputs())
        inframes.update(self.next_event_inputs())

        # Call the process method
        self.process(inframes)  # type: ignore[attr-defined]

TSSource dataclass

Bases: _TSSource


              flowchart TD
              sgnts.base.base.TSSource[TSSource]
              sgnts.base.base._TSSource[_TSSource]

                              sgnts.base.base._TSSource --> sgnts.base.base.TSSource
                


              click sgnts.base.base.TSSource href "" "sgnts.base.base.TSSource"
              click sgnts.base.base._TSSource href "" "sgnts.base.base._TSSource"
            

A time-series source that generates data in fixed-size buffers where the user can specify the start time and end time. If you want a data driven source consider using TSResourceSource.

Parameters:

Name Type Description Default
start float | None

float, start time of first buffer, in seconds

None
end float | None

float, end time of the last buffer, in seconds

None
duration float | None

float, alternative to end option, specify the duration of time to be covered in seconds. Cannot be given if end is given.

None
Source code in src/sgnts/base/base.py
@dataclass
class TSSource(_TSSource):
    """A time-series source that generates data in fixed-size buffers where the
       user can specify the start time and end time. If you want a data driven
       source consider using TSResourceSource.

    Args:
        start:
            float, start time of first buffer, in seconds
        end:
            float, end time of the last buffer, in seconds
        duration:
            float, alternative to end option, specify the duration of
            time to be covered in seconds. Cannot be given if end is given.
    """

    start: float | None = None
    end: float | None = None
    duration: float | None = None

    def __post_init__(self):
        super().__post_init__()

        # Validate mutual exclusivity of end and duration
        if self.end is not None and self.duration is not None:
            raise ValueError("may specify either end or duration, not both")

        # Default to infinite duration if neither end nor duration specified
        if self.end is None and self.duration is None:
            self.duration = numpy.iinfo(numpy.int64).max

        # Calculate end from start + duration when both are available
        if self.start is not None and self.duration is not None:
            self.end = self.start + self.duration

        # Validate end > start when both are available
        if self.start is not None and self.end is not None:
            assert self.end > self.start, "end is before start"

    @property
    def end_offset(self):
        if self.end is None:
            return float("inf")
        return Offset.fromsec(self.end - Offset.offset_ref_start / Time.SECONDS)

    @property
    def start_offset(self):
        if self.start is None:
            return None
        return Offset.fromsec(self.start - Offset.offset_ref_start / Time.SECONDS)

    def set_pad_buffer_params(
        self,
        pad: SourcePad,
        sample_shape: tuple[int, ...],
        rate: int,
    ) -> None:
        """Set variables on the pad that are needed to construct SeriesBuffers.

        These should remain constant throughout the duration of the
        pipeline so this method may only be called once.

        Args:
            pad:
                SourcePad, the pad to setup buffers on
            sample_shape:
                tuple[int, ...], the shape of a single sample of the
                data, or put another way, the shape of the data except
                for the last (time) dimension,
                i.e. sample_shape=data.shape[:-1]
            rate:
                int, the sample rate of the data the pad will produce

        Raises:
            ValueError: If start or end times don't align to sample boundaries
                at the given rate

        """
        # Make sure this has only been called once per pad
        assert (
            pad not in self._new_buffer_dict
        ), f"Pad {pad.name} already exists in _new_buffer_dict - duplicate pad entry"

        # Validate that start and end times align to sample boundaries at this rate
        # This prevents obscure runtime errors during offset-to-sample conversions
        if self.start is not None:
            Offset.validate_time_alignment(self.start, rate, param_name="start")
        if self.end is not None and self.end != numpy.iinfo(numpy.int64).max:
            Offset.validate_time_alignment(self.end, rate, param_name="end")

        self._new_buffer_dict[pad] = {
            "sample_rate": rate,
            "shape": sample_shape + (self.num_samples(rate),),
        }
        self._next_frame_dict[pad] = TSFrame.from_buffer_kwargs(
            offset=self.start_offset, data=None, **self._new_buffer_dict[pad]
        )

set_pad_buffer_params(pad, sample_shape, rate)

Set variables on the pad that are needed to construct SeriesBuffers.

These should remain constant throughout the duration of the pipeline so this method may only be called once.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad to setup buffers on

required
sample_shape tuple[int, ...]

tuple[int, ...], the shape of a single sample of the data, or put another way, the shape of the data except for the last (time) dimension, i.e. sample_shape=data.shape[:-1]

required
rate int

int, the sample rate of the data the pad will produce

required

Raises:

Type Description
ValueError

If start or end times don't align to sample boundaries at the given rate

Source code in src/sgnts/base/base.py
def set_pad_buffer_params(
    self,
    pad: SourcePad,
    sample_shape: tuple[int, ...],
    rate: int,
) -> None:
    """Set variables on the pad that are needed to construct SeriesBuffers.

    These should remain constant throughout the duration of the
    pipeline so this method may only be called once.

    Args:
        pad:
            SourcePad, the pad to setup buffers on
        sample_shape:
            tuple[int, ...], the shape of a single sample of the
            data, or put another way, the shape of the data except
            for the last (time) dimension,
            i.e. sample_shape=data.shape[:-1]
        rate:
            int, the sample rate of the data the pad will produce

    Raises:
        ValueError: If start or end times don't align to sample boundaries
            at the given rate

    """
    # Make sure this has only been called once per pad
    assert (
        pad not in self._new_buffer_dict
    ), f"Pad {pad.name} already exists in _new_buffer_dict - duplicate pad entry"

    # Validate that start and end times align to sample boundaries at this rate
    # This prevents obscure runtime errors during offset-to-sample conversions
    if self.start is not None:
        Offset.validate_time_alignment(self.start, rate, param_name="start")
    if self.end is not None and self.end != numpy.iinfo(numpy.int64).max:
        Offset.validate_time_alignment(self.end, rate, param_name="end")

    self._new_buffer_dict[pad] = {
        "sample_rate": rate,
        "shape": sample_shape + (self.num_samples(rate),),
    }
    self._next_frame_dict[pad] = TSFrame.from_buffer_kwargs(
        offset=self.start_offset, data=None, **self._new_buffer_dict[pad]
    )

TSTransform dataclass

Bases: TimeSeriesMixin[TSFrame], TransformElement[TimeSpanFrame]


              flowchart TD
              sgnts.base.base.TSTransform[TSTransform]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSTransform
                


              click sgnts.base.base.TSTransform href "" "sgnts.base.base.TSTransform"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A time-series transform element.

Source code in src/sgnts/base/base.py
@dataclass
class TSTransform(TimeSeriesMixin[TSFrame], TransformElement[TimeSpanFrame]):
    """A time-series transform element."""

    def internal(self) -> None:
        """Process frames by calling child class implementation.

        If the child class defines a process() method, it will be called with
        input and output frame dictionaries. Otherwise, child classes should
        override internal() directly.
        """
        super().internal()

        # Check if the element defines a process() method
        if hasattr(self, "process"):
            # Collect all input frames (both TSFrame and EventFrame)
            inframes: dict[SinkPad, TimeSpanFrame] = {}
            inframes.update(self.next_ts_inputs())
            inframes.update(self.next_event_inputs())

            # Collect all output collectors/frames (TSCollectFrame or EventFrame)
            ts_collectors = self.next_ts_outputs()
            outframes: dict[SourcePad, TimeSpanFrame | TSCollectFrame] = {}
            outframes.update(ts_collectors)
            outframes.update(self.next_event_outputs())

            # Call the process method
            self.process(inframes, outframes)  # type: ignore[attr-defined]

            # Close all TS collectors to commit buffers to parent frames
            for collector in ts_collectors.values():
                collector.close()

    def new(self, pad: SourcePad) -> TimeSpanFrame:
        """Return the output frame for the given pad.

        It should take the source pad as an argument and return a new
        TSFrame or EventFrame.

        Args:
            pad:
                SourcePad, The source pad that is producing the transformed frame

        Returns:
            TSFrame or EventFrame, The transformed frame

        """
        frame = self.outframes.get(pad)
        assert frame is not None
        return frame

internal()

Process frames by calling child class implementation.

If the child class defines a process() method, it will be called with input and output frame dictionaries. Otherwise, child classes should override internal() directly.

Source code in src/sgnts/base/base.py
def internal(self) -> None:
    """Process frames by calling child class implementation.

    If the child class defines a process() method, it will be called with
    input and output frame dictionaries. Otherwise, child classes should
    override internal() directly.
    """
    super().internal()

    # Check if the element defines a process() method
    if hasattr(self, "process"):
        # Collect all input frames (both TSFrame and EventFrame)
        inframes: dict[SinkPad, TimeSpanFrame] = {}
        inframes.update(self.next_ts_inputs())
        inframes.update(self.next_event_inputs())

        # Collect all output collectors/frames (TSCollectFrame or EventFrame)
        ts_collectors = self.next_ts_outputs()
        outframes: dict[SourcePad, TimeSpanFrame | TSCollectFrame] = {}
        outframes.update(ts_collectors)
        outframes.update(self.next_event_outputs())

        # Call the process method
        self.process(inframes, outframes)  # type: ignore[attr-defined]

        # Close all TS collectors to commit buffers to parent frames
        for collector in ts_collectors.values():
            collector.close()

new(pad)

Return the output frame for the given pad.

It should take the source pad as an argument and return a new TSFrame or EventFrame.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad that is producing the transformed frame

required

Returns:

Type Description
TimeSpanFrame

TSFrame or EventFrame, The transformed frame

Source code in src/sgnts/base/base.py
def new(self, pad: SourcePad) -> TimeSpanFrame:
    """Return the output frame for the given pad.

    It should take the source pad as an argument and return a new
    TSFrame or EventFrame.

    Args:
        pad:
            SourcePad, The source pad that is producing the transformed frame

    Returns:
        TSFrame or EventFrame, The transformed frame

    """
    frame = self.outframes.get(pad)
    assert frame is not None
    return frame

TimeSeriesMixin dataclass

Bases: ElementLike, Generic[TSFrameLike]


              flowchart TD
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

              

              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

Mixin that adds time-series capabilities to any SGN element.

This will produce aligned frames in preparedframes. If the adapter is not explicitly disabled, will trigger the audioadapter to queue data, and make padded or strided frames in preparedframes.

This mixin provides: - Frame alignment across multiple input pads - Optional adapter processing (overlap/stride/gap handling) - Timeout detection and EOS handling

Note

Subclasses can customize alignment behavior by setting class-level attributes: - static_unaligned_sink_pads: Declare which sink pads should not be aligned (e.g., EventFrame pads or auxiliary inputs).

Parameters:

Name Type Description Default
max_age float

float, the max age before timeout, in seconds

100.0
adapter_config AdapterConfig

AdapterConfig, holds parameters used for audioadapter behavior

AdapterConfig()
unaligned Sequence[str]

list[str], the list of unaligned sink pads.

list()
Source code in src/sgnts/base/base.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
@dataclass
class TimeSeriesMixin(ElementLike, Generic[TSFrameLike]):
    """Mixin that adds time-series capabilities to any SGN element.

    This will produce aligned frames in preparedframes. If the adapter
    is not explicitly disabled, will trigger the audioadapter to queue
    data, and make padded or strided frames in preparedframes.

    This mixin provides:
    - Frame alignment across multiple input pads
    - Optional adapter processing (overlap/stride/gap handling)
    - Timeout detection and EOS handling

    Note:
        Subclasses can customize alignment behavior by setting class-level
        attributes:
          - static_unaligned_sink_pads: Declare which sink pads should not be
            aligned (e.g., EventFrame pads or auxiliary inputs).

    Args:
        max_age:
            float, the max age before timeout, in seconds
        adapter_config:
            AdapterConfig, holds parameters used for audioadapter behavior
        unaligned:
            list[str], the list of unaligned sink pads.

    """

    # Class-level attributes for alignment configuration
    static_unaligned_sink_pads: ClassVar[list[str]] = []

    max_age: float = 100.0
    adapter_config: AdapterConfig = field(default_factory=AdapterConfig)
    unaligned: Sequence[str] = field(default_factory=list)

    def __post_init__(self) -> None:
        """Initialize timeseries state."""
        super().__post_init__()

        # Determine which user-provided input pads require alignment
        unaligned_names = list(self.unaligned) + self.static_unaligned_sink_pads

        # Convert pad names to SinkPad objects and store in instance variable
        self.unaligned_sink_pads = [
            self.snks[name] for name in unaligned_names  # type: ignore[attr-defined]
        ]
        self.aligned_sink_pads = [
            p for p in self.sink_pads if p not in self.unaligned_sink_pads
        ]

        # Initialize metadata for exempt sink pads
        self.unaligned_data: dict[SinkPad, TimeSpanFrame | None] = {
            p: None for p in self.unaligned_sink_pads
        }

        # Initialize the alignment metadata for all sink pads that need to be aligned
        self._is_aligned = False
        self.inbufs = {p: Audioadapter() for p in self.aligned_sink_pads}
        self.preparedframes: dict[SinkPad, TSFrame | None] = {
            p: None for p in self.aligned_sink_pads
        }
        self.aligned_slices: dict[SinkPad, TSSlices | None] = {
            p: None for p in self.aligned_sink_pads
        }
        self.outframes: dict[SourcePad, TimeSpanFrame | None] = {
            p: None for p in self.source_pads
        }
        self.preparedoutoffsets = {"offset": 0, "noffset": 0}
        self.at_EOS = False
        self._last_ts: dict[SinkPad, float | None] = {
            p: None for p in self.aligned_sink_pads
        }
        self._last_offset: dict[SinkPad, int | None] = {
            p: None for p in self.aligned_sink_pads
        }
        self.metadata: dict[SinkPad, dict[Any, Any]] = {
            p: {} for p in self.aligned_sink_pads
        }

        # Initialize default frame types for inputs and outputs
        # These can be overridden by derived classes in configure()
        self.input_frame_types: dict[str, type[TimeSpanFrame]] = {
            name: TSFrame for name in self.sink_pad_names  # type: ignore[attr-defined]
        }
        # Initialize default output frame types (only for elements with source pads)
        # All pads default to TSFrame, elements can override in configure()
        self.output_frame_types: dict[str, type[TimeSpanFrame]] = {
            name: TSFrame for name in getattr(self, "source_pad_names", [])
        }

        # Configure adapter and element-specific attributes
        self.configure()

        self.is_adapter_enabled = self.adapter_config.is_enabled

        # Initialize adapter-specific state only if adapter is enabled
        self.audioadapters = None
        if self.is_adapter_enabled:
            self.overlap = self.adapter_config.overlap
            self.stride = self.adapter_config.stride
            self.pad_zeros_startup = self.adapter_config.pad_zeros_startup
            self.skip_gaps = self.adapter_config.skip_gaps
            self.offset_shift = self.adapter_config.offset_shift

            # we need audioadapters
            self.audioadapters = {
                p: Audioadapter(backend=self.adapter_config.backend)
                for p in self.aligned_sink_pads
            }
            self.pad_zeros_offset = 0
            if self.pad_zeros_startup is True:
                # at startup, pad zeros in front of the first buffer to
                # serve as history
                self.pad_zeros_offset = self.overlap[0]
        else:
            # No adapter, so no offset shift
            self.offset_shift = 0

        # Call validation hooks
        self.validate()

    def configure(self) -> None:
        """Configure element-specific settings."""
        pass

    def validate(self) -> None:
        """Validate element configuration."""
        pass

    def next_input(self) -> tuple[SinkPad, TSFrame]:
        """Convenience method - get single TSFrame input.

        Equivalent to next_ts_input(). For transforms that only work with TSFrames.

        Returns:
            TSFrame, single input frame
        """
        return self.next_ts_input()

    def next_output(self) -> tuple[SourcePad, TSCollectFrame]:
        """Convenience method - get single TSCollectFrame output.

        Equivalent to next_ts_output(). For transforms that only work with TSFrames.

        Note: Elements using this method must call `.close()` on the collector
        when done populating buffers.

        Returns:
            tuple[SourcePad, TSCollectFrame], pad and collector for output frame
        """
        return self.next_ts_output()

    def next_inputs(self) -> dict[SinkPad, TSFrame]:
        """Convenience method - get all TSFrame inputs.

        Equivalent to next_ts_inputs(). For transforms that only work with TSFrames.

        Returns:
            dict[SinkPad, TSFrame], dictionary of input frames
        """
        return self.next_ts_inputs()

    def next_outputs(self) -> dict[SourcePad, TSCollectFrame]:
        """Convenience method - get all TSCollectFrame outputs.

        Equivalent to next_ts_outputs(). For transforms that only work with TSFrames.

        Note: Elements using this method must call `.close()` on each collector
        when done populating buffers.

        Returns:
            dict[SourcePad, TSCollectFrame], dictionary of collectors for output frames
        """
        return self.next_ts_outputs()

    def next_ts_input(self) -> tuple[SinkPad, TSFrame]:
        """Get single TSFrame input.

        Returns:
            TSFrame, the single TSFrame from inputs

        Raises:
            AssertionError: If there is not exactly one TSFrame input.
        """
        all_ts = self.next_ts_inputs()
        assert (
            len(all_ts) == 1
        ), f"next_ts_input() requires exactly one TSFrame input, got {len(all_ts)}"
        return next(iter(all_ts.items()))

    def next_ts_inputs(self) -> dict[SinkPad, TSFrame]:
        """Get all TSFrame inputs based on input_frame_types configuration.

        Returns:
            dict[SinkPad, TSFrame], mapping of sink pads to TSFrame inputs
        """
        result: dict[SinkPad, TSFrame] = {}
        for pad in self.sink_pads:
            pad_name = self.rsnks[pad]  # type: ignore[attr-defined]
            if self.input_frame_types.get(pad_name, TSFrame) == TSFrame:
                if pad in self.aligned_sink_pads:
                    frame = self.preparedframes[pad]
                    assert frame is not None
                    result[pad] = frame
                else:
                    unaligned_frame = self.unaligned_data[pad]
                    assert isinstance(unaligned_frame, TSFrame), (
                        f"Expected TSFrame on unaligned pad {pad_name}, "
                        f"got {type(unaligned_frame).__name__}"
                    )
                    result[pad] = unaligned_frame
        return result

    def next_event_input(self) -> tuple[SinkPad, EventFrame]:
        """Get single EventFrame input.

        Returns:
            EventFrame, the single EventFrame from inputs

        Raises:
            AssertionError: If there is not exactly one EventFrame input.
        """
        all_events = self.next_event_inputs()
        assert len(all_events) == 1, (
            f"next_event_input() requires exactly one EventFrame input, "
            f"got {len(all_events)}"
        )
        return next(iter(all_events.items()))

    def next_event_inputs(self) -> dict[SinkPad, EventFrame]:
        """Get all EventFrame inputs based on input_frame_types configuration.

        Returns:
            dict[SinkPad, EventFrame], mapping of sink pads to EventFrame inputs
        """
        result: dict[SinkPad, EventFrame] = {}
        for pad in self.sink_pads:
            pad_name = self.rsnks[pad]  # type: ignore[attr-defined]
            if self.input_frame_types.get(pad_name) == EventFrame:
                if pad in self.unaligned_sink_pads:
                    frame = self.unaligned_data[pad]
                else:
                    frame = self.preparedframes[pad]
                assert isinstance(frame, EventFrame), (
                    f"Expected EventFrame on pad {pad_name}, "
                    f"got {type(frame).__name__}"
                )
                result[pad] = frame
        return result

    def next_ts_output(self) -> tuple[SourcePad, TSCollectFrame]:
        """Get single TSCollectFrame for output with offsets from preparedoutoffsets.

        Note: The caller must call `.close()` on the collector when done
        populating buffers.

        Returns:
            tuple[SourcePad, TSCollectFrame], pad and collector for the output
            frame

        Raises:
            AssertionError: If there is not exactly one TS output pad.
        """
        all_ts = self.next_ts_outputs()
        assert (
            len(all_ts) == 1
        ), f"next_ts_output() requires exactly one TS output pad, got {len(all_ts)}"
        return next(iter(all_ts.items()))

    def next_ts_outputs(self) -> dict[SourcePad, TSCollectFrame]:
        """Get all TSCollectFrames for output pads configured as TS outputs.

        Creates TSFrame instances with offset/noffset from preparedoutoffsets,
        then creates TSCollectFrame collectors for atomic buffer population.
        The parent TSFrames are automatically registered in self.outframes.

        Returns:
            dict[SourcePad, TSCollectFrame], mapping of source pads to collectors
        """
        offset = self.preparedoutoffsets["offset"]
        noffset = self.preparedoutoffsets["noffset"]
        at_EOS = any(
            frame.EOS for frame in self.preparedframes.values() if frame is not None
        )

        result: dict[SourcePad, TSCollectFrame] = {}
        for pad in self.source_pads:
            pad_name = self.rsrcs[pad]  # type: ignore[attr-defined]
            if self.output_frame_types.get(pad_name, TSFrame) == TSFrame:
                frame = TSFrame(offset=offset, noffset=noffset, EOS=at_EOS)
                collector = frame.fill()
                result[pad] = collector
                # Automatically register the parent frame in outframes
                self.outframes[pad] = frame
        return result

    def next_event_output(self) -> tuple[SourcePad, EventFrame]:
        """Get single EventFrame for output with offset/noffset from preparedoutoffsets.

        Returns:
            EventFrame, an empty event frame ready to be populated

        Raises:
            AssertionError: If there is not exactly one event output pad.
        """
        all_events = self.next_event_outputs()
        assert len(all_events) == 1, (
            f"next_event_output() requires exactly one event output pad, "
            f"got {len(all_events)}"
        )
        return next(iter(all_events.items()))

    def next_event_outputs(self) -> dict[SourcePad, EventFrame]:
        """Get all EventFrames for output pads configured as event outputs.

        Creates EventFrame instances with offset/noffset from preparedoutoffsets
        for all source pads configured to produce EventFrame. The frames are
        automatically registered in self.outframes for return.

        Returns:
            dict[SourcePad, EventFrame], mapping of source pads to empty EventFrames
        """
        offset = self.preparedoutoffsets["offset"]
        noffset = self.preparedoutoffsets["noffset"]
        at_EOS = any(
            frame.EOS for frame in self.preparedframes.values() if frame is not None
        )

        result: dict[SourcePad, EventFrame] = {}
        for pad in self.source_pads:
            pad_name = self.rsrcs[pad]  # type: ignore[attr-defined]
            if self.output_frame_types.get(pad_name) == EventFrame:
                frame = EventFrame(offset=offset, noffset=noffset, EOS=at_EOS)
                result[pad] = frame
                # Automatically register in outframes
                self.outframes[pad] = frame
        return result

    def pull(self, pad: SinkPad, frame: TimeSpanFrame) -> None:
        """Pull data and queue for alignment.

        Pull data from the input pads (source pads of upstream elements) and
        queue data to perform alignment once frames from all pads are pulled.

        Args:
            pad:
                SinkPad, The sink pad that is pulling the frame
            frame:
                TimeSpanFrame, The frame that is pulled to sink pad
        """
        self.at_EOS |= frame.EOS

        # Handle case of a pad that is exempt from alignment
        if pad in self.unaligned_sink_pads:
            # Store most recent data for exempt pads
            self.unaligned_data[pad] = frame
            # TODO maybe add bespoke timeout handling here
            return

        # Handle case of a pad that requires alignment
        # extend and check the buffers
        for buf in frame:
            self.inbufs[pad].push(buf)
        self.metadata[pad] = frame.metadata

        if self.timeout(pad):
            raise ValueError("pad %s has timed out" % pad.name)

    def _compute_aligned_offset(self, current_offset: int, align_to: int) -> int:
        """Compute aligned offset based on alignment boundary.

        Args:
            current_offset: Current offset in offsets
            align_to: Alignment boundary in offsets

        Returns:
            Aligned offset
        """
        return ((current_offset + align_to - 1) // align_to) * align_to

    def __adapter(self, pad: SinkPad, frame: list[SeriesBuffer]) -> list[SeriesBuffer]:
        """Use the audioadapter to handle streaming scenarios.

        This will pad with overlap before and after the target output
        data, and produce fixed-stride frames.

        The self.preparedframes are padded with the requested overlap padding. This
        method also produces a self.preparedoutoffsets, that infers the metadata
        information for the output buffer, with the data initialized as None.
        Downstream transforms can directly use the frames from self.preparedframes for
        computation, and then use the offset and noffset information in
        self.preparedoutoffsets to construct the output frame.

        If stride is not provided, the audioadapter will push out as many samples as it
        can. If stride is provided, the audioadapter will wait until there are enough
        samples to produce prepared frames.

        Args:
            pad:
                SinkPad, the sink pad on which to prepare adapted frames
            frame:
                TSFrame, the aligned frame

        Returns:
            list[SeriesBuffers], a list of SeriesBuffers that are adapted according to
            the adapter config

        Examples:
            upsampling:
                kernel length = 17
                need to pad 8 samples before and after
                overlap_samples = (8, 8)
                stride_samples = 16
                                                for output
                preparedframes:     ________................________
                                                stride
                                    pad         samples=16  pad
                                    samples=8               samples=8


            correlation:
                filter length = 16
                need to pad filter_length - 1 samples
                overlap_samples = (15, 0)
                stride_samples = 8
                                                    for output
                preparedframes:     ----------------........
                                                    stride_samples=8
                                    pad
                                    samples=15

        """
        assert self.audioadapters is not None
        a = self.audioadapters[pad]
        buf0 = frame[0]
        sample_rate = buf0.sample_rate
        overlap_samples = tuple(Offset.tosamples(o, sample_rate) for o in self.overlap)
        stride_samples = Offset.tosamples(self.stride, sample_rate)
        pad_zeros_samples = Offset.tosamples(self.pad_zeros_offset, sample_rate)

        # push all buffers in the frame into the audioadapter
        for buf in frame:
            a.push(buf)

        # Check whether we have enough samples to produce a frame
        min_samples = sum(overlap_samples) + (stride_samples or 1) - pad_zeros_samples

        # figure out the offset for preparedframes and preparedoutoffsets
        offset = a.offset - self.pad_zeros_offset
        outoffset = offset + self.overlap[0]

        # Determine if we're using alignment mode
        use_alignment = (
            self.is_adapter_enabled and self.adapter_config.align_to is not None
        )

        # Apply alignment if configured
        if use_alignment:
            assert self.adapter_config.align_to is not None
            outoffset = self._compute_aligned_offset(
                outoffset,
                self.adapter_config.align_to,
            )

        preparedbufs = []

        # Check if we have enough data
        if use_alignment:
            # For aligned mode, check if we have data up to aligned_offset + stride
            aligned_end = outoffset + self.stride
            has_enough_data = a.end_offset >= aligned_end
        else:
            # Original check based on size
            has_enough_data = a.size >= min_samples

        if not has_enough_data:
            # not enough samples to produce output yet
            # make a heartbeat buffer
            shape = buf0.shape[:-1] + (0,)
            preparedbufs.append(
                SeriesBuffer(
                    offset=offset, sample_rate=sample_rate, data=None, shape=shape
                )
            )
            # prepare output frames, one buffer per frame
            self.preparedoutoffsets = {
                "offset": outoffset + self.offset_shift,
                "noffset": 0,
            }
        else:
            # We have enough samples, retrieve data
            if use_alignment:
                # Retrieve data at exact aligned offset
                aligned_end = outoffset + self.stride
                stride_samples_actual = Offset.tosamples(self.stride, sample_rate)

                # Check for gaps in the aligned segment
                segment_has_gap, segment_has_nongap = a.segment_gaps_info(
                    (outoffset, aligned_end)
                )

                if not segment_has_nongap or (self.skip_gaps and segment_has_gap):
                    # Gap in aligned segment
                    data = None
                else:
                    # Retrieve data at the aligned offset using offset-based slicing
                    data = a.copy_samples_by_offset_segment(
                        (outoffset, aligned_end), pad_start=False
                    )

                # Create output buffer at aligned offset (no padding needed if aligned)
                shape = buf0.shape[:-1] + (
                    stride_samples_actual if data is not None else 0,
                )
                pbuf = SeriesBuffer(
                    offset=outoffset,  # Use aligned offset
                    sample_rate=sample_rate,
                    data=data,
                    shape=shape,
                )
                preparedbufs.append(pbuf)

                # Flush data up to the END of the aligned segment (not the start)
                # This ensures next iteration starts after this segment
                a.flush_samples_by_end_offset(aligned_end)

                # Output offset metadata
                outnoffset = self.stride
                self.preparedoutoffsets = {
                    "offset": outoffset + self.offset_shift,
                    "noffset": outnoffset,
                }

                # No padding offset adjustment needed for aligned mode
                self.pad_zeros_offset = 0

            else:
                # copy all of the samples in the audioadapter
                if self.stride == 0:
                    # provide all the data
                    num_copy_samples = a.size
                else:
                    num_copy_samples = min_samples

                segment_has_gap, segment_has_nongap = a.segment_gaps_info(
                    (
                        a.offset,
                        a.offset + Offset.fromsamples(num_copy_samples, a.sample_rate),
                    )
                )

                # Check if we should preserve buffer boundaries (align_buffers mode)
                if self.adapter_config.align_buffers:
                    # Return sliced buffers without merging, preserving gaps
                    end_offset = a.offset + Offset.fromsamples(
                        num_copy_samples, a.sample_rate
                    )
                    preparedbufs = list(
                        a.get_sliced_buffers((a.offset, end_offset), pad_start=False)
                    )
                    outnoffset = end_offset - a.offset
                    self.preparedoutoffsets = {
                        "offset": a.offset + self.offset_shift,
                        "noffset": outnoffset,
                    }

                    # flush out samples from head of audioadapter
                    num_flush_samples = num_copy_samples - sum(overlap_samples)
                    if num_flush_samples > 0:
                        a.flush_samples(num_flush_samples)

                elif not segment_has_nongap or (self.skip_gaps and segment_has_gap):
                    # produce a gap buffer if
                    # 1. the whole segment is a gap or
                    # 2. there are gaps in the segment and we are skipping gaps
                    data = None

                    # flush out samples from head of audioadapter
                    num_flush_samples = num_copy_samples - sum(overlap_samples)
                    if num_flush_samples > 0:
                        a.flush_samples(num_flush_samples)

                    shape = buf0.shape[:-1] + (num_copy_samples + pad_zeros_samples,)

                    # update next zeros padding
                    self.pad_zeros_offset = -min(
                        0, Offset.fromsamples(num_flush_samples, sample_rate)
                    )
                    pbuf = SeriesBuffer(
                        offset=offset, sample_rate=sample_rate, data=data, shape=shape
                    )
                    preparedbufs.append(pbuf)
                    outnoffset = pbuf.noffset - sum(self.overlap)
                    self.preparedoutoffsets = {
                        "offset": outoffset + self.offset_shift,
                        "noffset": outnoffset,
                    }

                else:
                    # copy out samples from head of audioadapter
                    data = a.copy_samples(num_copy_samples)
                    if self.pad_zeros_offset > 0:
                        # pad zeros in front of buffer
                        data = self.adapter_config.backend.pad(
                            data, (pad_zeros_samples, 0)
                        )

                    # flush out samples from head of audioadapter
                    num_flush_samples = num_copy_samples - sum(overlap_samples)
                    if num_flush_samples > 0:
                        a.flush_samples(num_flush_samples)

                    shape = buf0.shape[:-1] + (num_copy_samples + pad_zeros_samples,)

                    # update next zeros padding
                    self.pad_zeros_offset = -min(
                        0, Offset.fromsamples(num_flush_samples, sample_rate)
                    )
                    pbuf = SeriesBuffer(
                        offset=offset, sample_rate=sample_rate, data=data, shape=shape
                    )
                    preparedbufs.append(pbuf)
                    outnoffset = pbuf.noffset - sum(self.overlap)
                    self.preparedoutoffsets = {
                        "offset": outoffset + self.offset_shift,
                        "noffset": outnoffset,
                    }

        return preparedbufs

    def internal(self) -> None:
        """Align buffers from all the sink pads.

        If AdapterConfig is provided, perform the requested
        overlap/stride streaming of frames.
        """
        # align if possible
        self._align()

        # put in heartbeat buffer if not aligned
        if not self._is_aligned:
            for sink_pad in self.aligned_sink_pads:
                self.preparedframes[sink_pad] = TSFrame(
                    EOS=self.at_EOS,
                    buffers=[
                        SeriesBuffer(
                            offset=self.earliest,
                            sample_rate=self.inbufs[sink_pad].sample_rate,
                            data=None,
                            shape=self.inbufs[sink_pad].buffers[0].shape[:-1] + (0,),
                        ),
                    ],
                    metadata=self.metadata[sink_pad],
                )
            # Set preparedoutoffsets for heartbeat (zero-length output)
            self.preparedoutoffsets = {
                "offset": self.earliest + self.offset_shift,
                "noffset": 0,
            }
        # Else pack all the buffers
        else:
            min_latest = self.min_latest
            earliest = self.earliest

            rates = set(
                self.inbufs[sink_pad].sample_rate for sink_pad in self.aligned_sink_pads
            )
            off = min_latest - earliest
            for rate in rates:
                factor = Offset.MAX_RATE // rate
                if off % factor:
                    off = off // factor * factor
                    min_latest = earliest + off

            for sink_pad in self.aligned_sink_pads:
                out = list(
                    self.inbufs[sink_pad].get_sliced_buffers(
                        (earliest, min_latest), pad_start=True
                    )
                )
                if min_latest > self.inbufs[sink_pad].offset:
                    self.inbufs[sink_pad].flush_samples_by_end_offset(min_latest)
                assert (
                    len(out) > 0
                ), "No buffers returned from get_sliced_buffers for aligned processing"

                # Apply adapter processing only if adapter is enabled
                if self.is_adapter_enabled:
                    out = self.__adapter(sink_pad, out)

                self.preparedframes[sink_pad] = TSFrame(
                    EOS=self.at_EOS,
                    buffers=out,
                    metadata=self.metadata[sink_pad],
                )

            # Apply buffer alignment if requested
            if self.adapter_config.align_buffers:
                computed_slices = self._compute_aligned_slices()
                for pad, slices in computed_slices.items():
                    self.aligned_slices[pad] = slices

                for pad in self.aligned_sink_pads:
                    aligned_slice = self.aligned_slices[pad]
                    assert aligned_slice is not None
                    frame = self.preparedframes[pad]
                    assert frame is not None
                    # Only align if there are slices (skip if all gaps)
                    if aligned_slice.slices:
                        self.preparedframes[pad] = frame.align(aligned_slice)

            # Set preparedoutoffsets for non-adapter case
            if not self.is_adapter_enabled:
                self.preparedoutoffsets = {
                    "offset": earliest + self.offset_shift,
                    "noffset": min_latest - earliest,
                }

    def _compute_aligned_slices(self) -> dict[SinkPad, TSSlices]:
        """Compute aligned slices for all pads based on minimum sampling rate.

        Extracts slices from prepared frames, finds the minimum
        sampling rate across all pads, and aligns all slices to that rate.
        """
        # Find minimum sampling rate across all aligned pads
        nongap_slices: dict[SinkPad, TSSlices] = {}
        sample_rates = []
        for pad in self.aligned_sink_pads:
            frame = self.preparedframes[pad]
            assert frame is not None
            sample_rates.append(frame.sample_rate)
        min_rate = min(sample_rates)

        # For each pad, extract slices corresponding to non-gaps and align to
        # minimum rate
        for pad in self.aligned_sink_pads:
            frame = self.preparedframes[pad]
            assert frame is not None
            # Extract non-gap buffer slices from the prepared frame
            buffer_slices = [buf.slice for buf in frame.buffers if not buf.is_gap]

            if not buffer_slices:
                # No non-gap buffers, no slices to align
                nongap_slices[pad] = TSSlices([])
                continue

            # align slices to minimum rate
            slices = TSSlices(buffer_slices)
            aligned = slices.align_to_rate(min_rate)
            nongap_slices[pad] = aligned

        all_nongap_slices = TSSlices.intersection_of_multiple(
            list(nongap_slices.values())
        )
        start = self.preparedoutoffsets["offset"]
        end = start + self.preparedoutoffsets["noffset"]
        boundaries = sorted(
            set(
                [
                    start,
                    *list(chain(*[[s.start, s.stop] for s in all_nongap_slices])),
                    end,
                ]
            )
        )
        slice_boundaries = TSSlices(
            [
                TSSlice(b_start, b_stop)
                for b_start, b_stop in zip(boundaries[:-1], boundaries[1:])
            ]
        )

        return {pad: slice_boundaries for pad in self.aligned_sink_pads}

    def _align(self) -> None:
        """Align the buffers in self.inbufs."""

        def slice_from_pad(inbufs):
            if len(inbufs) > 0:
                return TSSlice(inbufs.offset, inbufs.end_offset)
            else:
                return TSSlice(-1, -1)

        def can_align():
            return TSSlices(
                [slice_from_pad(self.inbufs[p]) for p in self.inbufs]
            ).intersection()

        if not self._is_aligned and can_align():
            self._is_aligned = True

    def timeout(self, pad: SinkPad) -> bool:
        """Whether pad has timed-out due to oldest buffer exceeding max age.

        Args:
            pad:
                SinkPad, the sink pad to check for timeout

        Returns:
            True if the pad has timed out

        """
        return self.inbufs[pad].end_offset - self.inbufs[pad].offset > Offset.fromsec(
            self.max_age
        )

    def latest_by_pad(self, pad: SinkPad) -> int:
        """The latest offset among the queued up buffers in this pad.

        Args:
            pad:
                SinkPad, the requested sink pad

        Returns:
            int, the latest offset in the pad's buffer queue

        """
        return self.inbufs[pad].end_offset if self.inbufs[pad] else -1

    def earliest_by_pad(self, pad: SinkPad) -> int:
        """The earliest offset among the queued up buffers in this pad.

        Args:
            pad:
                SinkPad, the requested sink pad

        Returns:
            int, the earliest offset in the pad's buffer queue

        """
        return self.inbufs[pad].offset if self.inbufs[pad] else -1

    @property
    def latest(self) -> int:
        """The latest offset among all the buffers from all the pads."""
        return max(self.latest_by_pad(n) for n in self.inbufs)

    @property
    def earliest(self) -> int:
        """The earliest offset among all the buffers from all the pads."""
        return min(self.earliest_by_pad(n) for n in self.inbufs)

    @property
    def min_latest(self) -> int:
        """The earliest offset among each pad's latest offset."""
        return min(self.latest_by_pad(n) for n in self.inbufs)

    @property
    def is_aligned(self) -> bool:
        """Check if input frames are currently aligned across all pads.

        Returns:
            True if frames from all input pads have overlapping time ranges
            and can be processed together. False if waiting for more data.
        """
        return self._is_aligned

earliest property

The earliest offset among all the buffers from all the pads.

is_aligned property

Check if input frames are currently aligned across all pads.

Returns:

Type Description
bool

True if frames from all input pads have overlapping time ranges

bool

and can be processed together. False if waiting for more data.

latest property

The latest offset among all the buffers from all the pads.

min_latest property

The earliest offset among each pad's latest offset.

__adapter(pad, frame)

Use the audioadapter to handle streaming scenarios.

This will pad with overlap before and after the target output data, and produce fixed-stride frames.

The self.preparedframes are padded with the requested overlap padding. This method also produces a self.preparedoutoffsets, that infers the metadata information for the output buffer, with the data initialized as None. Downstream transforms can directly use the frames from self.preparedframes for computation, and then use the offset and noffset information in self.preparedoutoffsets to construct the output frame.

If stride is not provided, the audioadapter will push out as many samples as it can. If stride is provided, the audioadapter will wait until there are enough samples to produce prepared frames.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the sink pad on which to prepare adapted frames

required
frame list[SeriesBuffer]

TSFrame, the aligned frame

required

Returns:

Type Description
list[SeriesBuffer]

list[SeriesBuffers], a list of SeriesBuffers that are adapted according to

list[SeriesBuffer]

the adapter config

Examples:

upsampling: kernel length = 17 need to pad 8 samples before and after overlap_samples = (8, 8) stride_samples = 16 for output preparedframes: _................_ stride pad samples=16 pad samples=8 samples=8

correlation: filter length = 16 need to pad filter_length - 1 samples overlap_samples = (15, 0) stride_samples = 8 for output preparedframes: ----------------........ stride_samples=8 pad samples=15

Source code in src/sgnts/base/base.py
def __adapter(self, pad: SinkPad, frame: list[SeriesBuffer]) -> list[SeriesBuffer]:
    """Use the audioadapter to handle streaming scenarios.

    This will pad with overlap before and after the target output
    data, and produce fixed-stride frames.

    The self.preparedframes are padded with the requested overlap padding. This
    method also produces a self.preparedoutoffsets, that infers the metadata
    information for the output buffer, with the data initialized as None.
    Downstream transforms can directly use the frames from self.preparedframes for
    computation, and then use the offset and noffset information in
    self.preparedoutoffsets to construct the output frame.

    If stride is not provided, the audioadapter will push out as many samples as it
    can. If stride is provided, the audioadapter will wait until there are enough
    samples to produce prepared frames.

    Args:
        pad:
            SinkPad, the sink pad on which to prepare adapted frames
        frame:
            TSFrame, the aligned frame

    Returns:
        list[SeriesBuffers], a list of SeriesBuffers that are adapted according to
        the adapter config

    Examples:
        upsampling:
            kernel length = 17
            need to pad 8 samples before and after
            overlap_samples = (8, 8)
            stride_samples = 16
                                            for output
            preparedframes:     ________................________
                                            stride
                                pad         samples=16  pad
                                samples=8               samples=8


        correlation:
            filter length = 16
            need to pad filter_length - 1 samples
            overlap_samples = (15, 0)
            stride_samples = 8
                                                for output
            preparedframes:     ----------------........
                                                stride_samples=8
                                pad
                                samples=15

    """
    assert self.audioadapters is not None
    a = self.audioadapters[pad]
    buf0 = frame[0]
    sample_rate = buf0.sample_rate
    overlap_samples = tuple(Offset.tosamples(o, sample_rate) for o in self.overlap)
    stride_samples = Offset.tosamples(self.stride, sample_rate)
    pad_zeros_samples = Offset.tosamples(self.pad_zeros_offset, sample_rate)

    # push all buffers in the frame into the audioadapter
    for buf in frame:
        a.push(buf)

    # Check whether we have enough samples to produce a frame
    min_samples = sum(overlap_samples) + (stride_samples or 1) - pad_zeros_samples

    # figure out the offset for preparedframes and preparedoutoffsets
    offset = a.offset - self.pad_zeros_offset
    outoffset = offset + self.overlap[0]

    # Determine if we're using alignment mode
    use_alignment = (
        self.is_adapter_enabled and self.adapter_config.align_to is not None
    )

    # Apply alignment if configured
    if use_alignment:
        assert self.adapter_config.align_to is not None
        outoffset = self._compute_aligned_offset(
            outoffset,
            self.adapter_config.align_to,
        )

    preparedbufs = []

    # Check if we have enough data
    if use_alignment:
        # For aligned mode, check if we have data up to aligned_offset + stride
        aligned_end = outoffset + self.stride
        has_enough_data = a.end_offset >= aligned_end
    else:
        # Original check based on size
        has_enough_data = a.size >= min_samples

    if not has_enough_data:
        # not enough samples to produce output yet
        # make a heartbeat buffer
        shape = buf0.shape[:-1] + (0,)
        preparedbufs.append(
            SeriesBuffer(
                offset=offset, sample_rate=sample_rate, data=None, shape=shape
            )
        )
        # prepare output frames, one buffer per frame
        self.preparedoutoffsets = {
            "offset": outoffset + self.offset_shift,
            "noffset": 0,
        }
    else:
        # We have enough samples, retrieve data
        if use_alignment:
            # Retrieve data at exact aligned offset
            aligned_end = outoffset + self.stride
            stride_samples_actual = Offset.tosamples(self.stride, sample_rate)

            # Check for gaps in the aligned segment
            segment_has_gap, segment_has_nongap = a.segment_gaps_info(
                (outoffset, aligned_end)
            )

            if not segment_has_nongap or (self.skip_gaps and segment_has_gap):
                # Gap in aligned segment
                data = None
            else:
                # Retrieve data at the aligned offset using offset-based slicing
                data = a.copy_samples_by_offset_segment(
                    (outoffset, aligned_end), pad_start=False
                )

            # Create output buffer at aligned offset (no padding needed if aligned)
            shape = buf0.shape[:-1] + (
                stride_samples_actual if data is not None else 0,
            )
            pbuf = SeriesBuffer(
                offset=outoffset,  # Use aligned offset
                sample_rate=sample_rate,
                data=data,
                shape=shape,
            )
            preparedbufs.append(pbuf)

            # Flush data up to the END of the aligned segment (not the start)
            # This ensures next iteration starts after this segment
            a.flush_samples_by_end_offset(aligned_end)

            # Output offset metadata
            outnoffset = self.stride
            self.preparedoutoffsets = {
                "offset": outoffset + self.offset_shift,
                "noffset": outnoffset,
            }

            # No padding offset adjustment needed for aligned mode
            self.pad_zeros_offset = 0

        else:
            # copy all of the samples in the audioadapter
            if self.stride == 0:
                # provide all the data
                num_copy_samples = a.size
            else:
                num_copy_samples = min_samples

            segment_has_gap, segment_has_nongap = a.segment_gaps_info(
                (
                    a.offset,
                    a.offset + Offset.fromsamples(num_copy_samples, a.sample_rate),
                )
            )

            # Check if we should preserve buffer boundaries (align_buffers mode)
            if self.adapter_config.align_buffers:
                # Return sliced buffers without merging, preserving gaps
                end_offset = a.offset + Offset.fromsamples(
                    num_copy_samples, a.sample_rate
                )
                preparedbufs = list(
                    a.get_sliced_buffers((a.offset, end_offset), pad_start=False)
                )
                outnoffset = end_offset - a.offset
                self.preparedoutoffsets = {
                    "offset": a.offset + self.offset_shift,
                    "noffset": outnoffset,
                }

                # flush out samples from head of audioadapter
                num_flush_samples = num_copy_samples - sum(overlap_samples)
                if num_flush_samples > 0:
                    a.flush_samples(num_flush_samples)

            elif not segment_has_nongap or (self.skip_gaps and segment_has_gap):
                # produce a gap buffer if
                # 1. the whole segment is a gap or
                # 2. there are gaps in the segment and we are skipping gaps
                data = None

                # flush out samples from head of audioadapter
                num_flush_samples = num_copy_samples - sum(overlap_samples)
                if num_flush_samples > 0:
                    a.flush_samples(num_flush_samples)

                shape = buf0.shape[:-1] + (num_copy_samples + pad_zeros_samples,)

                # update next zeros padding
                self.pad_zeros_offset = -min(
                    0, Offset.fromsamples(num_flush_samples, sample_rate)
                )
                pbuf = SeriesBuffer(
                    offset=offset, sample_rate=sample_rate, data=data, shape=shape
                )
                preparedbufs.append(pbuf)
                outnoffset = pbuf.noffset - sum(self.overlap)
                self.preparedoutoffsets = {
                    "offset": outoffset + self.offset_shift,
                    "noffset": outnoffset,
                }

            else:
                # copy out samples from head of audioadapter
                data = a.copy_samples(num_copy_samples)
                if self.pad_zeros_offset > 0:
                    # pad zeros in front of buffer
                    data = self.adapter_config.backend.pad(
                        data, (pad_zeros_samples, 0)
                    )

                # flush out samples from head of audioadapter
                num_flush_samples = num_copy_samples - sum(overlap_samples)
                if num_flush_samples > 0:
                    a.flush_samples(num_flush_samples)

                shape = buf0.shape[:-1] + (num_copy_samples + pad_zeros_samples,)

                # update next zeros padding
                self.pad_zeros_offset = -min(
                    0, Offset.fromsamples(num_flush_samples, sample_rate)
                )
                pbuf = SeriesBuffer(
                    offset=offset, sample_rate=sample_rate, data=data, shape=shape
                )
                preparedbufs.append(pbuf)
                outnoffset = pbuf.noffset - sum(self.overlap)
                self.preparedoutoffsets = {
                    "offset": outoffset + self.offset_shift,
                    "noffset": outnoffset,
                }

    return preparedbufs

__post_init__()

Initialize timeseries state.

Source code in src/sgnts/base/base.py
def __post_init__(self) -> None:
    """Initialize timeseries state."""
    super().__post_init__()

    # Determine which user-provided input pads require alignment
    unaligned_names = list(self.unaligned) + self.static_unaligned_sink_pads

    # Convert pad names to SinkPad objects and store in instance variable
    self.unaligned_sink_pads = [
        self.snks[name] for name in unaligned_names  # type: ignore[attr-defined]
    ]
    self.aligned_sink_pads = [
        p for p in self.sink_pads if p not in self.unaligned_sink_pads
    ]

    # Initialize metadata for exempt sink pads
    self.unaligned_data: dict[SinkPad, TimeSpanFrame | None] = {
        p: None for p in self.unaligned_sink_pads
    }

    # Initialize the alignment metadata for all sink pads that need to be aligned
    self._is_aligned = False
    self.inbufs = {p: Audioadapter() for p in self.aligned_sink_pads}
    self.preparedframes: dict[SinkPad, TSFrame | None] = {
        p: None for p in self.aligned_sink_pads
    }
    self.aligned_slices: dict[SinkPad, TSSlices | None] = {
        p: None for p in self.aligned_sink_pads
    }
    self.outframes: dict[SourcePad, TimeSpanFrame | None] = {
        p: None for p in self.source_pads
    }
    self.preparedoutoffsets = {"offset": 0, "noffset": 0}
    self.at_EOS = False
    self._last_ts: dict[SinkPad, float | None] = {
        p: None for p in self.aligned_sink_pads
    }
    self._last_offset: dict[SinkPad, int | None] = {
        p: None for p in self.aligned_sink_pads
    }
    self.metadata: dict[SinkPad, dict[Any, Any]] = {
        p: {} for p in self.aligned_sink_pads
    }

    # Initialize default frame types for inputs and outputs
    # These can be overridden by derived classes in configure()
    self.input_frame_types: dict[str, type[TimeSpanFrame]] = {
        name: TSFrame for name in self.sink_pad_names  # type: ignore[attr-defined]
    }
    # Initialize default output frame types (only for elements with source pads)
    # All pads default to TSFrame, elements can override in configure()
    self.output_frame_types: dict[str, type[TimeSpanFrame]] = {
        name: TSFrame for name in getattr(self, "source_pad_names", [])
    }

    # Configure adapter and element-specific attributes
    self.configure()

    self.is_adapter_enabled = self.adapter_config.is_enabled

    # Initialize adapter-specific state only if adapter is enabled
    self.audioadapters = None
    if self.is_adapter_enabled:
        self.overlap = self.adapter_config.overlap
        self.stride = self.adapter_config.stride
        self.pad_zeros_startup = self.adapter_config.pad_zeros_startup
        self.skip_gaps = self.adapter_config.skip_gaps
        self.offset_shift = self.adapter_config.offset_shift

        # we need audioadapters
        self.audioadapters = {
            p: Audioadapter(backend=self.adapter_config.backend)
            for p in self.aligned_sink_pads
        }
        self.pad_zeros_offset = 0
        if self.pad_zeros_startup is True:
            # at startup, pad zeros in front of the first buffer to
            # serve as history
            self.pad_zeros_offset = self.overlap[0]
    else:
        # No adapter, so no offset shift
        self.offset_shift = 0

    # Call validation hooks
    self.validate()

configure()

Configure element-specific settings.

Source code in src/sgnts/base/base.py
def configure(self) -> None:
    """Configure element-specific settings."""
    pass

earliest_by_pad(pad)

The earliest offset among the queued up buffers in this pad.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the requested sink pad

required

Returns:

Type Description
int

int, the earliest offset in the pad's buffer queue

Source code in src/sgnts/base/base.py
def earliest_by_pad(self, pad: SinkPad) -> int:
    """The earliest offset among the queued up buffers in this pad.

    Args:
        pad:
            SinkPad, the requested sink pad

    Returns:
        int, the earliest offset in the pad's buffer queue

    """
    return self.inbufs[pad].offset if self.inbufs[pad] else -1

internal()

Align buffers from all the sink pads.

If AdapterConfig is provided, perform the requested overlap/stride streaming of frames.

Source code in src/sgnts/base/base.py
def internal(self) -> None:
    """Align buffers from all the sink pads.

    If AdapterConfig is provided, perform the requested
    overlap/stride streaming of frames.
    """
    # align if possible
    self._align()

    # put in heartbeat buffer if not aligned
    if not self._is_aligned:
        for sink_pad in self.aligned_sink_pads:
            self.preparedframes[sink_pad] = TSFrame(
                EOS=self.at_EOS,
                buffers=[
                    SeriesBuffer(
                        offset=self.earliest,
                        sample_rate=self.inbufs[sink_pad].sample_rate,
                        data=None,
                        shape=self.inbufs[sink_pad].buffers[0].shape[:-1] + (0,),
                    ),
                ],
                metadata=self.metadata[sink_pad],
            )
        # Set preparedoutoffsets for heartbeat (zero-length output)
        self.preparedoutoffsets = {
            "offset": self.earliest + self.offset_shift,
            "noffset": 0,
        }
    # Else pack all the buffers
    else:
        min_latest = self.min_latest
        earliest = self.earliest

        rates = set(
            self.inbufs[sink_pad].sample_rate for sink_pad in self.aligned_sink_pads
        )
        off = min_latest - earliest
        for rate in rates:
            factor = Offset.MAX_RATE // rate
            if off % factor:
                off = off // factor * factor
                min_latest = earliest + off

        for sink_pad in self.aligned_sink_pads:
            out = list(
                self.inbufs[sink_pad].get_sliced_buffers(
                    (earliest, min_latest), pad_start=True
                )
            )
            if min_latest > self.inbufs[sink_pad].offset:
                self.inbufs[sink_pad].flush_samples_by_end_offset(min_latest)
            assert (
                len(out) > 0
            ), "No buffers returned from get_sliced_buffers for aligned processing"

            # Apply adapter processing only if adapter is enabled
            if self.is_adapter_enabled:
                out = self.__adapter(sink_pad, out)

            self.preparedframes[sink_pad] = TSFrame(
                EOS=self.at_EOS,
                buffers=out,
                metadata=self.metadata[sink_pad],
            )

        # Apply buffer alignment if requested
        if self.adapter_config.align_buffers:
            computed_slices = self._compute_aligned_slices()
            for pad, slices in computed_slices.items():
                self.aligned_slices[pad] = slices

            for pad in self.aligned_sink_pads:
                aligned_slice = self.aligned_slices[pad]
                assert aligned_slice is not None
                frame = self.preparedframes[pad]
                assert frame is not None
                # Only align if there are slices (skip if all gaps)
                if aligned_slice.slices:
                    self.preparedframes[pad] = frame.align(aligned_slice)

        # Set preparedoutoffsets for non-adapter case
        if not self.is_adapter_enabled:
            self.preparedoutoffsets = {
                "offset": earliest + self.offset_shift,
                "noffset": min_latest - earliest,
            }

latest_by_pad(pad)

The latest offset among the queued up buffers in this pad.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the requested sink pad

required

Returns:

Type Description
int

int, the latest offset in the pad's buffer queue

Source code in src/sgnts/base/base.py
def latest_by_pad(self, pad: SinkPad) -> int:
    """The latest offset among the queued up buffers in this pad.

    Args:
        pad:
            SinkPad, the requested sink pad

    Returns:
        int, the latest offset in the pad's buffer queue

    """
    return self.inbufs[pad].end_offset if self.inbufs[pad] else -1

next_event_input()

Get single EventFrame input.

Returns:

Type Description
tuple[SinkPad, EventFrame]

EventFrame, the single EventFrame from inputs

Raises:

Type Description
AssertionError

If there is not exactly one EventFrame input.

Source code in src/sgnts/base/base.py
def next_event_input(self) -> tuple[SinkPad, EventFrame]:
    """Get single EventFrame input.

    Returns:
        EventFrame, the single EventFrame from inputs

    Raises:
        AssertionError: If there is not exactly one EventFrame input.
    """
    all_events = self.next_event_inputs()
    assert len(all_events) == 1, (
        f"next_event_input() requires exactly one EventFrame input, "
        f"got {len(all_events)}"
    )
    return next(iter(all_events.items()))

next_event_inputs()

Get all EventFrame inputs based on input_frame_types configuration.

Returns:

Type Description
dict[SinkPad, EventFrame]

dict[SinkPad, EventFrame], mapping of sink pads to EventFrame inputs

Source code in src/sgnts/base/base.py
def next_event_inputs(self) -> dict[SinkPad, EventFrame]:
    """Get all EventFrame inputs based on input_frame_types configuration.

    Returns:
        dict[SinkPad, EventFrame], mapping of sink pads to EventFrame inputs
    """
    result: dict[SinkPad, EventFrame] = {}
    for pad in self.sink_pads:
        pad_name = self.rsnks[pad]  # type: ignore[attr-defined]
        if self.input_frame_types.get(pad_name) == EventFrame:
            if pad in self.unaligned_sink_pads:
                frame = self.unaligned_data[pad]
            else:
                frame = self.preparedframes[pad]
            assert isinstance(frame, EventFrame), (
                f"Expected EventFrame on pad {pad_name}, "
                f"got {type(frame).__name__}"
            )
            result[pad] = frame
    return result

next_event_output()

Get single EventFrame for output with offset/noffset from preparedoutoffsets.

Returns:

Type Description
tuple[SourcePad, EventFrame]

EventFrame, an empty event frame ready to be populated

Raises:

Type Description
AssertionError

If there is not exactly one event output pad.

Source code in src/sgnts/base/base.py
def next_event_output(self) -> tuple[SourcePad, EventFrame]:
    """Get single EventFrame for output with offset/noffset from preparedoutoffsets.

    Returns:
        EventFrame, an empty event frame ready to be populated

    Raises:
        AssertionError: If there is not exactly one event output pad.
    """
    all_events = self.next_event_outputs()
    assert len(all_events) == 1, (
        f"next_event_output() requires exactly one event output pad, "
        f"got {len(all_events)}"
    )
    return next(iter(all_events.items()))

next_event_outputs()

Get all EventFrames for output pads configured as event outputs.

Creates EventFrame instances with offset/noffset from preparedoutoffsets for all source pads configured to produce EventFrame. The frames are automatically registered in self.outframes for return.

Returns:

Type Description
dict[SourcePad, EventFrame]

dict[SourcePad, EventFrame], mapping of source pads to empty EventFrames

Source code in src/sgnts/base/base.py
def next_event_outputs(self) -> dict[SourcePad, EventFrame]:
    """Get all EventFrames for output pads configured as event outputs.

    Creates EventFrame instances with offset/noffset from preparedoutoffsets
    for all source pads configured to produce EventFrame. The frames are
    automatically registered in self.outframes for return.

    Returns:
        dict[SourcePad, EventFrame], mapping of source pads to empty EventFrames
    """
    offset = self.preparedoutoffsets["offset"]
    noffset = self.preparedoutoffsets["noffset"]
    at_EOS = any(
        frame.EOS for frame in self.preparedframes.values() if frame is not None
    )

    result: dict[SourcePad, EventFrame] = {}
    for pad in self.source_pads:
        pad_name = self.rsrcs[pad]  # type: ignore[attr-defined]
        if self.output_frame_types.get(pad_name) == EventFrame:
            frame = EventFrame(offset=offset, noffset=noffset, EOS=at_EOS)
            result[pad] = frame
            # Automatically register in outframes
            self.outframes[pad] = frame
    return result

next_input()

Convenience method - get single TSFrame input.

Equivalent to next_ts_input(). For transforms that only work with TSFrames.

Returns:

Type Description
tuple[SinkPad, TSFrame]

TSFrame, single input frame

Source code in src/sgnts/base/base.py
def next_input(self) -> tuple[SinkPad, TSFrame]:
    """Convenience method - get single TSFrame input.

    Equivalent to next_ts_input(). For transforms that only work with TSFrames.

    Returns:
        TSFrame, single input frame
    """
    return self.next_ts_input()

next_inputs()

Convenience method - get all TSFrame inputs.

Equivalent to next_ts_inputs(). For transforms that only work with TSFrames.

Returns:

Type Description
dict[SinkPad, TSFrame]

dict[SinkPad, TSFrame], dictionary of input frames

Source code in src/sgnts/base/base.py
def next_inputs(self) -> dict[SinkPad, TSFrame]:
    """Convenience method - get all TSFrame inputs.

    Equivalent to next_ts_inputs(). For transforms that only work with TSFrames.

    Returns:
        dict[SinkPad, TSFrame], dictionary of input frames
    """
    return self.next_ts_inputs()

next_output()

Convenience method - get single TSCollectFrame output.

Equivalent to next_ts_output(). For transforms that only work with TSFrames.

Note: Elements using this method must call .close() on the collector when done populating buffers.

Returns:

Type Description
tuple[SourcePad, TSCollectFrame]

tuple[SourcePad, TSCollectFrame], pad and collector for output frame

Source code in src/sgnts/base/base.py
def next_output(self) -> tuple[SourcePad, TSCollectFrame]:
    """Convenience method - get single TSCollectFrame output.

    Equivalent to next_ts_output(). For transforms that only work with TSFrames.

    Note: Elements using this method must call `.close()` on the collector
    when done populating buffers.

    Returns:
        tuple[SourcePad, TSCollectFrame], pad and collector for output frame
    """
    return self.next_ts_output()

next_outputs()

Convenience method - get all TSCollectFrame outputs.

Equivalent to next_ts_outputs(). For transforms that only work with TSFrames.

Note: Elements using this method must call .close() on each collector when done populating buffers.

Returns:

Type Description
dict[SourcePad, TSCollectFrame]

dict[SourcePad, TSCollectFrame], dictionary of collectors for output frames

Source code in src/sgnts/base/base.py
def next_outputs(self) -> dict[SourcePad, TSCollectFrame]:
    """Convenience method - get all TSCollectFrame outputs.

    Equivalent to next_ts_outputs(). For transforms that only work with TSFrames.

    Note: Elements using this method must call `.close()` on each collector
    when done populating buffers.

    Returns:
        dict[SourcePad, TSCollectFrame], dictionary of collectors for output frames
    """
    return self.next_ts_outputs()

next_ts_input()

Get single TSFrame input.

Returns:

Type Description
tuple[SinkPad, TSFrame]

TSFrame, the single TSFrame from inputs

Raises:

Type Description
AssertionError

If there is not exactly one TSFrame input.

Source code in src/sgnts/base/base.py
def next_ts_input(self) -> tuple[SinkPad, TSFrame]:
    """Get single TSFrame input.

    Returns:
        TSFrame, the single TSFrame from inputs

    Raises:
        AssertionError: If there is not exactly one TSFrame input.
    """
    all_ts = self.next_ts_inputs()
    assert (
        len(all_ts) == 1
    ), f"next_ts_input() requires exactly one TSFrame input, got {len(all_ts)}"
    return next(iter(all_ts.items()))

next_ts_inputs()

Get all TSFrame inputs based on input_frame_types configuration.

Returns:

Type Description
dict[SinkPad, TSFrame]

dict[SinkPad, TSFrame], mapping of sink pads to TSFrame inputs

Source code in src/sgnts/base/base.py
def next_ts_inputs(self) -> dict[SinkPad, TSFrame]:
    """Get all TSFrame inputs based on input_frame_types configuration.

    Returns:
        dict[SinkPad, TSFrame], mapping of sink pads to TSFrame inputs
    """
    result: dict[SinkPad, TSFrame] = {}
    for pad in self.sink_pads:
        pad_name = self.rsnks[pad]  # type: ignore[attr-defined]
        if self.input_frame_types.get(pad_name, TSFrame) == TSFrame:
            if pad in self.aligned_sink_pads:
                frame = self.preparedframes[pad]
                assert frame is not None
                result[pad] = frame
            else:
                unaligned_frame = self.unaligned_data[pad]
                assert isinstance(unaligned_frame, TSFrame), (
                    f"Expected TSFrame on unaligned pad {pad_name}, "
                    f"got {type(unaligned_frame).__name__}"
                )
                result[pad] = unaligned_frame
    return result

next_ts_output()

Get single TSCollectFrame for output with offsets from preparedoutoffsets.

Note: The caller must call .close() on the collector when done populating buffers.

Returns:

Type Description
SourcePad

tuple[SourcePad, TSCollectFrame], pad and collector for the output

TSCollectFrame

frame

Raises:

Type Description
AssertionError

If there is not exactly one TS output pad.

Source code in src/sgnts/base/base.py
def next_ts_output(self) -> tuple[SourcePad, TSCollectFrame]:
    """Get single TSCollectFrame for output with offsets from preparedoutoffsets.

    Note: The caller must call `.close()` on the collector when done
    populating buffers.

    Returns:
        tuple[SourcePad, TSCollectFrame], pad and collector for the output
        frame

    Raises:
        AssertionError: If there is not exactly one TS output pad.
    """
    all_ts = self.next_ts_outputs()
    assert (
        len(all_ts) == 1
    ), f"next_ts_output() requires exactly one TS output pad, got {len(all_ts)}"
    return next(iter(all_ts.items()))

next_ts_outputs()

Get all TSCollectFrames for output pads configured as TS outputs.

Creates TSFrame instances with offset/noffset from preparedoutoffsets, then creates TSCollectFrame collectors for atomic buffer population. The parent TSFrames are automatically registered in self.outframes.

Returns:

Type Description
dict[SourcePad, TSCollectFrame]

dict[SourcePad, TSCollectFrame], mapping of source pads to collectors

Source code in src/sgnts/base/base.py
def next_ts_outputs(self) -> dict[SourcePad, TSCollectFrame]:
    """Get all TSCollectFrames for output pads configured as TS outputs.

    Creates TSFrame instances with offset/noffset from preparedoutoffsets,
    then creates TSCollectFrame collectors for atomic buffer population.
    The parent TSFrames are automatically registered in self.outframes.

    Returns:
        dict[SourcePad, TSCollectFrame], mapping of source pads to collectors
    """
    offset = self.preparedoutoffsets["offset"]
    noffset = self.preparedoutoffsets["noffset"]
    at_EOS = any(
        frame.EOS for frame in self.preparedframes.values() if frame is not None
    )

    result: dict[SourcePad, TSCollectFrame] = {}
    for pad in self.source_pads:
        pad_name = self.rsrcs[pad]  # type: ignore[attr-defined]
        if self.output_frame_types.get(pad_name, TSFrame) == TSFrame:
            frame = TSFrame(offset=offset, noffset=noffset, EOS=at_EOS)
            collector = frame.fill()
            result[pad] = collector
            # Automatically register the parent frame in outframes
            self.outframes[pad] = frame
    return result

pull(pad, frame)

Pull data and queue for alignment.

Pull data from the input pads (source pads of upstream elements) and queue data to perform alignment once frames from all pads are pulled.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is pulling the frame

required
frame TimeSpanFrame

TimeSpanFrame, The frame that is pulled to sink pad

required
Source code in src/sgnts/base/base.py
def pull(self, pad: SinkPad, frame: TimeSpanFrame) -> None:
    """Pull data and queue for alignment.

    Pull data from the input pads (source pads of upstream elements) and
    queue data to perform alignment once frames from all pads are pulled.

    Args:
        pad:
            SinkPad, The sink pad that is pulling the frame
        frame:
            TimeSpanFrame, The frame that is pulled to sink pad
    """
    self.at_EOS |= frame.EOS

    # Handle case of a pad that is exempt from alignment
    if pad in self.unaligned_sink_pads:
        # Store most recent data for exempt pads
        self.unaligned_data[pad] = frame
        # TODO maybe add bespoke timeout handling here
        return

    # Handle case of a pad that requires alignment
    # extend and check the buffers
    for buf in frame:
        self.inbufs[pad].push(buf)
    self.metadata[pad] = frame.metadata

    if self.timeout(pad):
        raise ValueError("pad %s has timed out" % pad.name)

timeout(pad)

Whether pad has timed-out due to oldest buffer exceeding max age.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the sink pad to check for timeout

required

Returns:

Type Description
bool

True if the pad has timed out

Source code in src/sgnts/base/base.py
def timeout(self, pad: SinkPad) -> bool:
    """Whether pad has timed-out due to oldest buffer exceeding max age.

    Args:
        pad:
            SinkPad, the sink pad to check for timeout

    Returns:
        True if the pad has timed out

    """
    return self.inbufs[pad].end_offset - self.inbufs[pad].offset > Offset.fromsec(
        self.max_age
    )

validate()

Validate element configuration.

Source code in src/sgnts/base/base.py
def validate(self) -> None:
    """Validate element configuration."""
    pass

make_ts_element(sgn_element_class)

Factory to create TS-enabled versions of SGN elements.

This provides a simple way to add TS capabilities to existing SGN elements so they can connect to TS pipelines. Uses a basic AdapterConfig() that works for most general-purpose applications.

Parameters:

Name Type Description Default
sgn_element_class Type

SGN element class to enhance

required

Returns:

Type Description
Type

New class that combines SGN element with TS capabilities

Source code in src/sgnts/base/base.py
def make_ts_element(sgn_element_class: Type) -> Type:
    """Factory to create TS-enabled versions of SGN elements.

    This provides a simple way to add TS capabilities to existing SGN elements
    so they can connect to TS pipelines. Uses a basic AdapterConfig() that works
    for most general-purpose applications.

    Args:
        sgn_element_class: SGN element class to enhance

    Returns:
        New class that combines SGN element with TS capabilities
    """

    @dataclass
    class TSEnabledElement(TimeSeriesMixin, sgn_element_class):
        """Dynamically created TS-enabled element."""

        # Use basic adapter config that works for general TS connectivity
        adapter_config: AdapterConfig = field(default_factory=AdapterConfig)

        def pull(self, pad, frame):
            """Pull frame and queue for alignment."""
            # Only call TimeSeriesMixin.pull for alignment
            # SGN element's pull logic happens in internal() from preparedframes
            TimeSeriesMixin.pull(self, pad, frame)

        def internal(self):
            """Align frames and pass to SGN element's pull() logic."""
            # First, let TimeSeriesMixin align the frames
            TimeSeriesMixin.internal(self)

            # Now call the SGN element's pull() with aligned frames
            for pad in self.aligned_sink_pads:
                frame = self.preparedframes[pad]
                sgn_element_class.pull(self, pad, frame)

        def new(self, pad):
            """Default implementation of new() for factory-created elements."""
            return self.outframes.get(pad)

    # Set a meaningful name for the new class
    TSEnabledElement.__name__ = f"TS{sgn_element_class.__name__}"
    TSEnabledElement.__qualname__ = f"TS{sgn_element_class.__qualname__}"

    return TSEnabledElement