From 1d9037888f39145cd940ab6c9961642996a14210 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 12 Feb 2026 18:46:18 +0100 Subject: [PATCH 1/6] Add and expose `WindowSide` enum for resampling window configuration - Introduced the `WindowSide` enum with values `LEFT` and `RIGHT`: - `LEFT`: Represents the start of the resampling window. - `RIGHT`: Represents the end of the resampling window. - Exposed the `WindowSide` enum in the `__init__.py` of the timeseries module for public use. Signed-off-by: Malte Schaaf --- src/frequenz/sdk/timeseries/__init__.py | 2 ++ src/frequenz/sdk/timeseries/_resampling/_config.py | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/src/frequenz/sdk/timeseries/__init__.py b/src/frequenz/sdk/timeseries/__init__.py index d5bde1602..288ea6b5a 100644 --- a/src/frequenz/sdk/timeseries/__init__.py +++ b/src/frequenz/sdk/timeseries/__init__.py @@ -49,6 +49,7 @@ ResamplerConfig2, ResamplingFunction, ResamplingFunction2, + WindowSide, ) from ._resampling._exceptions import ResamplingError, SourceStoppedError from ._resampling._wall_clock_timer import ( @@ -82,4 +83,5 @@ "TickInfo", "WallClockTimer", "WallClockTimerConfig", + "WindowSide", ] diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 047892803..427573ab5 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -10,6 +10,7 @@ from collections.abc import Sequence from dataclasses import dataclass, field from datetime import datetime, timedelta +from enum import Enum from typing import Protocol from frequenz.core.datetime import UNIX_EPOCH @@ -44,6 +45,16 @@ """ +class WindowSide(Enum): + """Represents a side of a resampling window.""" + + LEFT = "left" + """The left side of the resampling window.""" + + RIGHT = "right" + """The right side of the resampling window.""" + + class ResamplingFunction(Protocol): """Combine multiple samples into a new one. From ed0e583b97010e80946061ee74e9c0db5bb828cb Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 09:13:50 +0100 Subject: [PATCH 2/6] Add `closed` option to ResamplerConfig to define resampling window behavior - Introduced the `closed` parameter in ResamplerConfig with options `WindowSide.RIGHT` (default) and `WindowSide.LEFT`. - Updated the resampling logic to respect the `closed` configuration: - `RIGHT`: Includes samples at the end of the window, excludes those at the start. - `LEFT`: Includes samples at the start of the window, excludes those at the end. - Adjusted documentation to reflect the new `closed` parameter and its behavior. Signed-off-by: Malte Schaaf --- .../sdk/timeseries/_resampling/_config.py | 14 ++++++++++ .../sdk/timeseries/_resampling/_resampler.py | 27 ++++++++++++------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 427573ab5..732aa5400 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -137,6 +137,20 @@ class ResamplerConfig: value. """ + closed: WindowSide = WindowSide.RIGHT + """Indicates which side of the resampling window is closed. + + If `WindowSide.RIGHT` (default), the resampling window is closed on the + right side and open on the left, meaning it includes samples with timestamps + within the range (start, end], where `start` and `end` are the boundaries of + the window. + + If `WindowSide.LEFT`, the resampling window is closed on the left side and + open on the right, meaning it includes samples with timestamps within the + range [start, end), where `start` and `end` are the boundaries of the + window. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index 81471fc0d..b1d071007 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -9,7 +9,7 @@ import itertools import logging import math -from bisect import bisect +from bisect import bisect, bisect_left from collections import deque from datetime import datetime, timedelta, timezone from typing import assert_never @@ -20,7 +20,7 @@ from ..._internal._asyncio import cancel_and_await from .._base_types import Sample from ._base_types import Sink, Source, SourceProperties -from ._config import ResamplerConfig, ResamplerConfig2 +from ._config import ResamplerConfig, ResamplerConfig2, WindowSide from ._exceptions import ResamplingError, SourceStoppedError from ._wall_clock_timer import TickInfo, WallClockTimer @@ -411,7 +411,8 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: """Generate a new sample based on all the current *relevant* samples. Args: - timestamp: The timestamp to be used to calculate the new sample. + timestamp: The reference timestamp for the resampling process. This + timestamp indicates the end of the resampling period. Returns: A new sample generated by calling the resampling function with all @@ -437,12 +438,20 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: ) minimum_relevant_timestamp = timestamp - period * conf.max_data_age_in_periods - min_index = bisect( - self._buffer, - minimum_relevant_timestamp, - key=lambda s: s[0], - ) - max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) + if self._config.closed == WindowSide.LEFT: + min_index = bisect_left( + self._buffer, + minimum_relevant_timestamp, + key=lambda s: s[0], + ) + max_index = bisect_left(self._buffer, timestamp, key=lambda s: s[0]) + else: + min_index = bisect( + self._buffer, + minimum_relevant_timestamp, + key=lambda s: s[0], + ) + max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) # Using itertools for slicing doesn't look very efficient, but # experiments with a custom (ring) buffer that can slice showed that # it is not that bad. See: From 2c31758541d711b2bf50b4e7870f6646707ec44a Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 09:22:07 +0100 Subject: [PATCH 3/6] Add `label` option to ResamplerConfig to define timestamp labeling of resampled data - Introduced the `label` parameter in ResamplerConfig with options `WindowSide.RIGHT` (default) and `WindowSide.LEFT`. - Updated the resampling logic to respect the `label` configuration: - `RIGHT`: The timestamp of the resampled data corresponds to the end of the resampling window. - `LEFT`: The timestamp of the resampled data corresponds to the start of the resampling window. - Adjusted the logic for setting `sample_time` to use the `label` configuration. - Updated documentation to reflect the new `label` parameter and its behavior. Signed-off-by: Malte Schaaf --- src/frequenz/sdk/timeseries/_resampling/_config.py | 9 +++++++++ src/frequenz/sdk/timeseries/_resampling/_resampler.py | 8 +++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 732aa5400..55de6854b 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -151,6 +151,15 @@ class ResamplerConfig: window. """ + label: WindowSide = WindowSide.RIGHT + """Indicates the timestamp label of the resampled data. + + If `WindowSide.RIGHT` (default), the timestamp of the resampled data + corresponds to the right boundary of the resampling window. If + `WindowSide.LEFT`, the timestamp corresponds to the left boundary of the + resampling window. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index b1d071007..82fb5f0f7 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -467,7 +467,13 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: if relevant_samples else None ) - return Sample(timestamp, None if value is None else Quantity(value)) + + sample_time = ( + timestamp - conf.resampling_period + if self._config.label == WindowSide.LEFT + else timestamp + ) + return Sample(sample_time, None if value is None else Quantity(value)) def _log_no_relevant_samples( self, minimum_relevant_timestamp: datetime, timestamp: datetime From 4dde523c6fa39e6caa5b509bbc344643330349c4 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 10:59:16 +0100 Subject: [PATCH 4/6] Add test for `closed` option in ResamplerConfig with additional samples - Enhanced the `test_resampler_closed_option` to include additional samples at 2.5, 3, and 4 seconds. - Verified the behavior of the `closed` option (`WindowSide.RIGHT` and `WindowSide.LEFT`) with the extended timeline. - Added assertions to ensure correct resampling function calls and sink outputs for the new samples. - Confirmed that source properties and buffer length are updated correctly after processing the additional samples. Signed-off-by: Malte Schaaf --- tests/timeseries/test_resampling.py | 122 ++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 3629b5f4e..80572d873 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -26,6 +26,7 @@ Sink, Source, SourceProperties, + WindowSide, ) from frequenz.sdk.timeseries._resampling._exceptions import ( ResamplingError, @@ -1504,6 +1505,127 @@ async def test_resampling_all_zeros( assert _get_buffer_len(resampler, source_receiver) == 3 +@pytest.mark.parametrize("closed", [WindowSide.RIGHT, WindowSide.LEFT]) +async def test_resampler_closed_option( + closed: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `closed` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=closed, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + source_props = resampler.get_source_properties(source_receiver) + + # Test timeline + # + # t(s) 0 1 2 2.5 3 4 + # |----------|----------R----|-----|----------R-----> (no more samples) + # value 5.0 10.0 15.0 1.0 4.0 5.0 + # + # R = resampling is done + + # Send a few samples and run a resample tick, advancing the fake time by one period + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + sample3 = Sample(timestamp + timedelta(seconds=2), value=Quantity(15.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + await source_sender.send(sample3) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + assert datetime.now(timezone.utc).timestamp() == 2 + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(expected_resampled_value), + ) + ) + # Assert the behavior based on the `closed` option + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample2), as_float_tuple(sample3)), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample1), as_float_tuple(sample2)), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=3, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + sink_mock.reset_mock() + resampling_fun_mock.reset_mock() + + # Additional samples at 2.5, 3, and 4 seconds + sample4 = Sample(timestamp + timedelta(seconds=2.5), value=Quantity(1.0)) + sample5 = Sample(timestamp + timedelta(seconds=3), value=Quantity(4.0)) + sample6 = Sample(timestamp + timedelta(seconds=4), value=Quantity(5.0)) + await source_sender.send(sample4) + await source_sender.send(sample5) + await source_sender.send(sample6) + + # Advance time to 4 seconds and resample again + await _advance_time(fake_time, resampling_period_s * 2) + await resampler.resample(one_shot=True) + + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s * 2), + Quantity(expected_resampled_value), + ) + ) + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample4), + as_float_tuple(sample5), + as_float_tuple(sample6), + ), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample3), + as_float_tuple(sample4), + as_float_tuple(sample5), + ), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=6, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen From fd9754e0270b94def6cc5154143a13e5e78b3660 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 11:01:35 +0100 Subject: [PATCH 5/6] Add test for `label` option in ResamplerConfig to verify timestamp behavior - Introduced `test_resampler_label_option` to validate the `label` configuration in ResamplerConfig. - Tested both `WindowSide.LEFT` and `WindowSide.RIGHT` options to ensure the resampled datas timestamp corresponds to the start or end of the resampling window, respectively. - Verified sink outputs with the expected timestamp and resampled value. Signed-off-by: Malte Schaaf --- tests/timeseries/test_resampling.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 80572d873..31f22638f 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -1626,6 +1626,56 @@ async def test_resampler_closed_option( assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len +@pytest.mark.parametrize("label", [WindowSide.LEFT, WindowSide.RIGHT]) +async def test_resampler_label_option( + label: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `label` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + label=label, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + # Send samples and resample + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + # Assert the timestamp of the resampled sample + expected_timestamp = ( + timestamp + if label == WindowSide.LEFT + else timestamp + timedelta(seconds=resampling_period_s) + ) + sink_mock.assert_called_once_with( + Sample(expected_timestamp, Quantity(expected_resampled_value)) + ) + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen From f745efc3df177552ba6a1aa58f57c0a56b8b1f27 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 17:06:34 +0100 Subject: [PATCH 6/6] Update release notes Signed-off-by: Malte Schaaf --- RELEASE_NOTES.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index dc08c0b5f..2794cb380 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,7 +12,18 @@ ## New Features - +- **`WindowSide` Enum**: Introduced the `WindowSide` enum to represent the sides of a resampling window: + - `LEFT`: Represents the start of the resampling window. + - `RIGHT`: Represents the end of the resampling window. + - The `WindowSide` enum is publicly available for import from the timeseries module. + +- **`closed` Parameter**: Added the `closed` parameter to `ResamplerConfig`, allowing users to configure which side of the resampling window is included. This parameter uses the new `WindowSide` enum: + - `WindowSide.RIGHT`: Includes the end of the window, excludes the start. (Default) + - `WindowSide.LEFT`: Includes the start of the window, excludes the end. + +- **`label` Parameter**: Added the `label` parameter to `ResamplerConfig`, allowing users to configure the timestamp labeling of the resampled data. This parameter uses the new `WindowSide` enum: + - `WindowSide.RIGHT`: The resampled timestamp represents the end of the resampling window. (Default) + - `WindowSide.LEFT`: The resampled timestamp represents the start of the resampling window. ## Bug Fixes