From 65c527bb39c7ea05ecae3ba28fd689740fc87581 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Thu, 2 Apr 2026 15:16:39 -0500 Subject: [PATCH 01/10] Add built-in rolling window to all consolidators --- ...lidatorRollingWindowRegressionAlgorithm.cs | 153 ++++++++++++++++++ ...lidatorRollingWindowRegressionAlgorithm.py | 62 +++++++ .../Consolidators/BaseTimelessConsolidator.cs | 14 +- Common/Data/Consolidators/ConsolidatorBase.cs | 79 +++++++++ Common/Data/Consolidators/DataConsolidator.cs | 28 +--- .../MarketHourAwareConsolidator.cs | 12 +- .../Data/Consolidators/RenkoConsolidator.cs | 19 +-- .../Consolidators/SequentialConsolidator.cs | 17 +- .../Python/DataConsolidatorPythonWrapper.cs | 27 +++- 9 files changed, 341 insertions(+), 70 deletions(-) create mode 100644 Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs create mode 100644 Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py create mode 100644 Common/Data/Consolidators/ConsolidatorBase.cs diff --git a/Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs b/Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs new file mode 100644 index 000000000000..3359db0d41e8 --- /dev/null +++ b/Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs @@ -0,0 +1,153 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using QuantConnect.Data; +using QuantConnect.Data.Consolidators; +using QuantConnect.Data.Market; +using QuantConnect.Interfaces; + +namespace QuantConnect.Algorithm.CSharp +{ + /// + /// Regression algorithm asserting that consolidators expose a built-in rolling window + /// + public class ConsolidatorRollingWindowRegressionAlgorithm : QCAlgorithm, IRegressionAlgorithmDefinition + { + private TradeBarConsolidator _consolidator; + private int _consolidationCount; + + /// + /// Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized. + /// + public override void Initialize() + { + SetStartDate(2013, 10, 07); + SetEndDate(2013, 10, 11); + + AddEquity("SPY", Resolution.Minute); + + _consolidator = new TradeBarConsolidator(TimeSpan.FromMinutes(10)); + _consolidator.DataConsolidated += OnDataConsolidated; + SubscriptionManager.AddConsolidator("SPY", _consolidator); + } + + private void OnDataConsolidated(object sender, TradeBar bar) + { + _consolidationCount++; + + // Window[0] must always be the bar just consolidated + var currentBar = (TradeBar)_consolidator[0]; + if (currentBar.Time != bar.Time) + { + throw new RegressionTestException($"Expected consolidator[0].Time == {bar.Time} but was {currentBar.Time}"); + } + if (currentBar.Close != bar.Close) + { + throw new RegressionTestException($"Expected consolidator[0].Close == {bar.Close} but was {currentBar.Close}"); + } + + // After the second consolidation the previous bar must be accessible at index 1 + if (_consolidator.Window.Count >= 2) + { + var previous = (TradeBar)_consolidator[1]; + if (previous.Time >= bar.Time) + { + throw new RegressionTestException($"consolidator[1].Time ({previous.Time}) should be earlier than consolidator[0].Time ({bar.Time})"); + } + if (previous.Close <= 0) + { + throw new RegressionTestException("consolidator[1].Close should be greater than zero"); + } + } + } + + public override void OnEndOfAlgorithm() + { + if (_consolidationCount == 0) + { + throw new RegressionTestException("Expected at least one consolidation but got zero"); + } + + // Default window size is 2, it must be full + if (_consolidator.Window.Count != 2) + { + throw new RegressionTestException( + $"Expected window count of 2 but was {_consolidator.Window.Count}"); + } + } + + /// + /// This is used by the regression test system to indicate if the open source Lean repository has the required data to run this algorithm. + /// + public bool CanRunLocally { get; } = true; + + /// + /// This is used by the regression test system to indicate which languages this algorithm is written in. + /// + public List Languages { get; } = new() { Language.CSharp, Language.Python }; + + /// + /// Data Points count of all timeslices of algorithm + /// + public long DataPoints => 3943; + + /// + /// Data Points count of the algorithm history + /// + public int AlgorithmHistoryDataPoints => 0; + + /// + /// Final status of the algorithm + /// + public AlgorithmStatus AlgorithmStatus => AlgorithmStatus.Completed; + + /// + /// This is used by the regression test system to indicate what the expected statistics are from running the algorithm + /// + public Dictionary ExpectedStatistics => new Dictionary + { + {"Total Orders", "0"}, + {"Average Win", "0%"}, + {"Average Loss", "0%"}, + {"Compounding Annual Return", "0%"}, + {"Drawdown", "0%"}, + {"Expectancy", "0"}, + {"Start Equity", "100000"}, + {"End Equity", "100000"}, + {"Net Profit", "0%"}, + {"Sharpe Ratio", "0"}, + {"Sortino Ratio", "0"}, + {"Probabilistic Sharpe Ratio", "0%"}, + {"Loss Rate", "0%"}, + {"Win Rate", "0%"}, + {"Profit-Loss Ratio", "0"}, + {"Alpha", "0"}, + {"Beta", "0"}, + {"Annual Standard Deviation", "0"}, + {"Annual Variance", "0"}, + {"Information Ratio", "-8.91"}, + {"Tracking Error", "0.223"}, + {"Treynor Ratio", "0"}, + {"Total Fees", "$0.00"}, + {"Estimated Strategy Capacity", "$0"}, + {"Lowest Capacity Asset", ""}, + {"Portfolio Turnover", "0%"}, + {"Drawdown Recovery", "0"}, + {"OrderListHash", "d41d8cd98f00b204e9800998ecf8427e"} + }; + } +} diff --git a/Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py b/Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py new file mode 100644 index 000000000000..087321235b8e --- /dev/null +++ b/Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py @@ -0,0 +1,62 @@ +# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. +# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from AlgorithmImports import * + +### +### Regression algorithm asserting that consolidators expose a built-in rolling window +### +class ConsolidatorRollingWindowRegressionAlgorithm(QCAlgorithm): + + def initialize(self): + self.set_start_date(2013, 10, 7) + self.set_end_date(2013, 10, 11) + + self.add_equity("SPY", Resolution.MINUTE) + + self._consolidation_count = 0 + self._consolidator = TradeBarConsolidator(timedelta(minutes=10)) + self._consolidator.data_consolidated += self._on_data_consolidated + self.subscription_manager.add_consolidator("SPY", self._consolidator) + + def _on_data_consolidated(self, sender, bar): + self._consolidation_count += 1 + + # consolidator[0] must always match the bar just fired + currentBar = self._consolidator[0] + if currentBar.time != bar.time: + raise AssertionError(f"Expected consolidator[0].time == {bar.time} but was {currentBar.time}") + if currentBar.value != bar.close: + raise AssertionError(f"Expected consolidator[0].value == {bar.close} but was {currentBar.value}") + + # After the second consolidation the previous bar must be at index 1 + if self._consolidator.window.count >= 2: + previous = self._consolidator[1] + if previous.time >= bar.time: + raise AssertionError( + f"consolidator[1].time ({previous.time}) should be earlier " + f"than consolidator[0].time ({bar.time})" + ) + if previous.value <= 0: + raise AssertionError("consolidator[1].value should be greater than zero") + + def on_data(self, data): + pass + + def on_end_of_algorithm(self): + if self._consolidation_count == 0: + raise AssertionError("Expected at least one consolidation but got zero") + + # Default window size is 2, it must be full + if self._consolidator.window.count != 2: + raise AssertionError(f"Expected window count of 2 but was {self._consolidator.window.count}") diff --git a/Common/Data/Consolidators/BaseTimelessConsolidator.cs b/Common/Data/Consolidators/BaseTimelessConsolidator.cs index 5fd297bdc09f..7498d0aed3e3 100644 --- a/Common/Data/Consolidators/BaseTimelessConsolidator.cs +++ b/Common/Data/Consolidators/BaseTimelessConsolidator.cs @@ -23,7 +23,7 @@ namespace QuantConnect.Data.Consolidators /// Represents a timeless consolidator which depends on the given values. This consolidator /// is meant to consolidate data into bars that do not depend on time, e.g., RangeBar's. /// - public abstract class BaseTimelessConsolidator : IDataConsolidator + public abstract class BaseTimelessConsolidator : ConsolidatorBase, IDataConsolidator where T : IBaseData { /// @@ -47,12 +47,6 @@ public abstract class BaseTimelessConsolidator : IDataConsolidator /// protected virtual T CurrentBar { get; set; } - /// - /// Gets the most recently consolidated piece of data. This will be null if this consolidator - /// has not produced any data yet. - /// - public IBaseData Consolidated { get; protected set; } - /// /// Gets a clone of the data being currently consolidated /// @@ -188,7 +182,7 @@ protected void OnDataConsolidated(T consolidated) DataConsolidatedHandler?.Invoke(this, consolidated); - Consolidated = consolidated; + UpdateConsolidated(consolidated); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. @@ -202,10 +196,10 @@ public virtual void Dispose() /// /// Resets the consolidator /// - public virtual void Reset() + public override void Reset() { - Consolidated = null; CurrentBar = default(T); + base.Reset(); } /// diff --git a/Common/Data/Consolidators/ConsolidatorBase.cs b/Common/Data/Consolidators/ConsolidatorBase.cs new file mode 100644 index 000000000000..366253750194 --- /dev/null +++ b/Common/Data/Consolidators/ConsolidatorBase.cs @@ -0,0 +1,79 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using System.Collections; +using System.Collections.Generic; +using QuantConnect.Indicators; + +namespace QuantConnect.Data.Consolidators +{ + /// + /// Provides a base implementation for consolidators, including a built-in rolling window + /// that stores the history of consolidated bars. + /// + public abstract class ConsolidatorBase : IEnumerable + { + /// + /// The default number of consolidated bars to keep in the rolling window history + /// + public static int DefaultWindowSize { get; } = 2; + + /// + /// A rolling window keeping a history of the consolidated bars. The most recent bar is at index 0. + /// + public RollingWindow Window { get; } = new RollingWindow(DefaultWindowSize); + + /// + /// Gets the most recently consolidated piece of data. This will be null if this consolidator + /// has not produced any data yet. + /// + public IBaseData Consolidated { get; protected set; } + + /// + /// Indexes the history window, where index 0 is the most recently consolidated bar. + /// + /// The index + /// The ith most recently consolidated bar + public IBaseData this[int i] => Window[i]; + + /// + /// Returns an enumerator that iterates through the history window. + /// + public IEnumerator GetEnumerator() => Window.GetEnumerator(); + + /// + /// Returns an enumerator that iterates through the history window. + /// + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + /// Updates and adds the bar to the rolling window. + /// + protected void UpdateConsolidated(IBaseData consolidated) + { + Consolidated = consolidated; + Window.Add(consolidated); + } + + /// + /// Resets this consolidator, clearing consolidated data and the rolling window. + /// + public virtual void Reset() + { + Consolidated = null; + Window.Reset(); + } + } +} diff --git a/Common/Data/Consolidators/DataConsolidator.cs b/Common/Data/Consolidators/DataConsolidator.cs index 8f5f57e46169..1146449fefd5 100644 --- a/Common/Data/Consolidators/DataConsolidator.cs +++ b/Common/Data/Consolidators/DataConsolidator.cs @@ -23,7 +23,7 @@ namespace QuantConnect.Data.Consolidators /// and/or aggregated data. /// /// The type consumed by the consolidator - public abstract class DataConsolidator : IDataConsolidator + public abstract class DataConsolidator : ConsolidatorBase, IDataConsolidator where TInput : IBaseData { /// @@ -52,15 +52,6 @@ public void Update(IBaseData data) /// public event DataConsolidatedHandler DataConsolidated; - /// - /// Gets the most recently consolidated piece of data. This will be null if this consolidator - /// has not produced any data yet. - /// - public IBaseData Consolidated - { - get; protected set; - } - /// /// Gets a clone of the data being currently consolidated /// @@ -74,7 +65,7 @@ public abstract IBaseData WorkingData /// public Type InputType { - get { return typeof (TInput); } + get { return typeof(TInput); } } /// @@ -102,18 +93,9 @@ protected virtual void OnDataConsolidated(IBaseData consolidated) var handler = DataConsolidated; if (handler != null) handler(this, consolidated); - // assign the Consolidated property after the event handlers are fired, - // this allows the event handlers to look at the new consolidated data - // and the previous consolidated data at the same time without extra bookkeeping - Consolidated = consolidated; - } - - /// - /// Resets the consolidator - /// - public virtual void Reset() - { - Consolidated = null; + // assign Consolidated and push to Window after the event handlers fire, + // so handlers can compare the new bar against the previous one without extra bookkeeping + UpdateConsolidated(consolidated); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. diff --git a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs index 27676b2aeb4a..3aaad931b889 100644 --- a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs +++ b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs @@ -25,7 +25,7 @@ namespace QuantConnect.Data.Common /// /// Consolidator for open markets bar only, extended hours bar are not consolidated. /// - public class MarketHourAwareConsolidator : IDataConsolidator + public class MarketHourAwareConsolidator : ConsolidatorBase, IDataConsolidator { private readonly bool _dailyStrictEndTimeEnabled; private readonly bool _extendedMarketHours; @@ -51,12 +51,6 @@ public class MarketHourAwareConsolidator : IDataConsolidator /// protected DateTimeZone DataTimeZone { get; set; } - /// - /// Gets the most recently consolidated piece of data. This will be null if this consolidator - /// has not produced any data yet. - /// - public IBaseData Consolidated => Consolidator.Consolidated; - /// /// Gets the type consumed by this consolidator /// @@ -164,12 +158,13 @@ public void Dispose() /// /// Resets the consolidator /// - public void Reset() + public override void Reset() { _useStrictEndTime = false; ExchangeHours = null; DataTimeZone = null; Consolidator.Reset(); + base.Reset(); } /// @@ -214,6 +209,7 @@ protected virtual bool UseStrictEndTime(Symbol symbol) protected virtual void ForwardConsolidatedBar(object sender, IBaseData consolidated) { DataConsolidated?.Invoke(this, consolidated); + UpdateConsolidated(consolidated); } } } diff --git a/Common/Data/Consolidators/RenkoConsolidator.cs b/Common/Data/Consolidators/RenkoConsolidator.cs index b2ddf9d8fba9..652290804632 100644 --- a/Common/Data/Consolidators/RenkoConsolidator.cs +++ b/Common/Data/Consolidators/RenkoConsolidator.cs @@ -24,13 +24,12 @@ namespace QuantConnect.Data.Consolidators /// /// This implementation replaced the original implementation that was shown to have inaccuracies in its representation /// of Renko charts. The original implementation has been moved to . - public class RenkoConsolidator : IDataConsolidator + public class RenkoConsolidator : ConsolidatorBase, IDataConsolidator { private bool _firstTick = true; private RenkoBar _lastWicko; private DataConsolidatedHandler _dataConsolidatedHandler; private RenkoBar _currentBar; - private IBaseData _consolidated; /// /// Time of consolidated close. @@ -94,16 +93,6 @@ public class RenkoConsolidator : IDataConsolidator /// public Type OutputType => typeof(RenkoBar); - /// - /// Gets the most recently consolidated piece of data. This will be null if this consolidator - /// has not produced any data yet. - /// - public IBaseData Consolidated - { - get { return _consolidated; } - private set { _consolidated = value; } - } - /// /// Event handler that fires when a new piece of data is produced /// @@ -244,18 +233,18 @@ public void Dispose() /// /// Resets the consolidator /// - public void Reset() + public override void Reset() { _firstTick = true; _lastWicko = null; _currentBar = null; - _consolidated = null; CloseOn = default; CloseRate = default; HighRate = default; LowRate = default; OpenOn = default; OpenRate = default; + base.Reset(); } /// @@ -268,7 +257,7 @@ protected void OnDataConsolidated(RenkoBar consolidated) DataConsolidated?.Invoke(this, consolidated); _currentBar = consolidated; _dataConsolidatedHandler?.Invoke(this, consolidated); - Consolidated = consolidated; + UpdateConsolidated(consolidated); } private void Rising(IBaseData data) diff --git a/Common/Data/Consolidators/SequentialConsolidator.cs b/Common/Data/Consolidators/SequentialConsolidator.cs index 6ce0fccd9e49..a9643549257e 100644 --- a/Common/Data/Consolidators/SequentialConsolidator.cs +++ b/Common/Data/Consolidators/SequentialConsolidator.cs @@ -22,7 +22,7 @@ namespace QuantConnect.Data.Consolidators /// such that data flows from the First to Second consolidator. It's output comes /// from the Second. /// - public class SequentialConsolidator : IDataConsolidator + public class SequentialConsolidator : ConsolidatorBase, IDataConsolidator { /// /// Gets the first consolidator to receive data @@ -41,17 +41,6 @@ public IDataConsolidator Second get; private set; } - /// - /// Gets the most recently consolidated piece of data. This will be null if this consolidator - /// has not produced any data yet. - /// - /// For a SequentialConsolidator, this is the output from the 'Second' consolidator. - /// - public IBaseData Consolidated - { - get { return Second.Consolidated; } - } - /// /// Gets a clone of the data being currently consolidated /// @@ -131,6 +120,7 @@ protected virtual void OnDataConsolidated(IBaseData consolidated) { var handler = DataConsolidated; if (handler != null) handler(this, consolidated); + UpdateConsolidated(consolidated); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. @@ -145,10 +135,11 @@ public void Dispose() /// /// Resets the consolidator /// - public void Reset() + public override void Reset() { First.Reset(); Second.Reset(); + base.Reset(); } } } diff --git a/Common/Python/DataConsolidatorPythonWrapper.cs b/Common/Python/DataConsolidatorPythonWrapper.cs index c2264726e217..9ce6c9912381 100644 --- a/Common/Python/DataConsolidatorPythonWrapper.cs +++ b/Common/Python/DataConsolidatorPythonWrapper.cs @@ -14,19 +14,42 @@ */ using System; +using System.Collections; +using System.Collections.Generic; using Python.Runtime; using QuantConnect.Data; using QuantConnect.Data.Consolidators; +using QuantConnect.Indicators; namespace QuantConnect.Python { /// /// Provides an Data Consolidator that wraps a object that represents a custom Python consolidator /// - public class DataConsolidatorPythonWrapper : BasePythonWrapper, IDataConsolidator + public class DataConsolidatorPythonWrapper : BasePythonWrapper, IDataConsolidator, IEnumerable { internal PyObject Model => Instance; + /// + /// A rolling window keeping a history of the consolidated bars. The most recent bar is at index 0. + /// + public RollingWindow Window { get; } = new RollingWindow(ConsolidatorBase.DefaultWindowSize); + + /// + /// Indexes the history window, where index 0 is the most recently consolidated bar. + /// + public IBaseData this[int i] => Window[i]; + + /// + /// Returns an enumerator that iterates through the history window. + /// + public IEnumerator GetEnumerator() => Window.GetEnumerator(); + + /// + /// Returns an enumerator that iterates through the history window. + /// + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + /// /// Gets the most recently consolidated piece of data. This will be null if this consolidator /// has not produced any data yet. @@ -84,6 +107,7 @@ public event DataConsolidatedHandler DataConsolidated public DataConsolidatorPythonWrapper(PyObject consolidator) : base(consolidator, true) { + DataConsolidated += (_, bar) => Window.Add(bar); } /// @@ -116,6 +140,7 @@ public void Dispose() public void Reset() { InvokeMethod(nameof(Reset)); + Window.Reset(); } } } From e35cf540aa355ed8366e5f2d12c77004252fd9eb Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Thu, 2 Apr 2026 16:51:45 -0500 Subject: [PATCH 02/10] Minor fix --- Common/Data/Consolidators/ConsolidatorBase.cs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/Common/Data/Consolidators/ConsolidatorBase.cs b/Common/Data/Consolidators/ConsolidatorBase.cs index 366253750194..2a219eee4a36 100644 --- a/Common/Data/Consolidators/ConsolidatorBase.cs +++ b/Common/Data/Consolidators/ConsolidatorBase.cs @@ -30,10 +30,22 @@ public abstract class ConsolidatorBase : IEnumerable /// public static int DefaultWindowSize { get; } = 2; + private RollingWindow _window; + /// /// A rolling window keeping a history of the consolidated bars. The most recent bar is at index 0. /// - public RollingWindow Window { get; } = new RollingWindow(DefaultWindowSize); + public RollingWindow Window + { + get + { + if (_window == null) + { + _window = new RollingWindow(DefaultWindowSize); + } + return _window; + } + } /// /// Gets the most recently consolidated piece of data. This will be null if this consolidator @@ -73,7 +85,7 @@ protected void UpdateConsolidated(IBaseData consolidated) public virtual void Reset() { Consolidated = null; - Window.Reset(); + _window?.Reset(); } } } From 853314e8f2c0ab2140e694d56987df01c89e3d37 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Tue, 7 Apr 2026 03:12:07 -0500 Subject: [PATCH 03/10] Add unit tests --- Tests/Common/Data/ConsolidatorBaseTests.cs | 154 ++++++++++++++++++ .../Data/MarketHourAwareConsolidatorTests.cs | 19 +++ 2 files changed, 173 insertions(+) create mode 100644 Tests/Common/Data/ConsolidatorBaseTests.cs diff --git a/Tests/Common/Data/ConsolidatorBaseTests.cs b/Tests/Common/Data/ConsolidatorBaseTests.cs new file mode 100644 index 000000000000..627596bc3018 --- /dev/null +++ b/Tests/Common/Data/ConsolidatorBaseTests.cs @@ -0,0 +1,154 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using NUnit.Framework; +using QuantConnect.Data; +using QuantConnect.Data.Consolidators; +using QuantConnect.Data.Market; +using QuantConnect.Indicators; + +namespace QuantConnect.Tests.Common.Data +{ + [TestFixture] + public class ConsolidatorBaseTests + { + [TestCaseSource(nameof(WindowTestCases))] + public void WindowStoresConsolidatedBars(IDataConsolidator consolidator, IBaseData[] bars, decimal expectedWindow0, decimal expectedWindow1) + { + var windowConsolidator = (ConsolidatorBase)consolidator; + + foreach (var bar in bars) + { + consolidator.Update(bar); + } + + Assert.AreEqual(2, windowConsolidator.Window.Count); + Assert.AreEqual(expectedWindow0, windowConsolidator.Window[0].Value); + Assert.AreEqual(expectedWindow1, windowConsolidator.Window[1].Value); + + consolidator.Dispose(); + } + + private static IEnumerable WindowTestCases() + { + var reference = new DateTime(2015, 4, 13); + var spy = Symbols.SPY; + var ibm = Symbols.IBM; + + yield return new TestCaseData( + new TradeBarConsolidator(1), + new IBaseData[] + { + new TradeBar { Symbol = spy, Time = reference, Close = 10m, Value = 10m, Period = Time.OneMinute }, + new TradeBar { Symbol = spy, Time = reference.AddMinutes(1), Close = 20m, Value = 20m, Period = Time.OneMinute } + }, + 20m, 10m + ).SetName("TradeBarConsolidator"); + + yield return new TestCaseData( + new QuoteBarConsolidator(1), + new IBaseData[] + { + new QuoteBar { Symbol = spy, Time = reference, Value = 10m, Period = Time.OneMinute }, + new QuoteBar { Symbol = spy, Time = reference.AddMinutes(1), Value = 20m, Period = Time.OneMinute } + }, + 20m, 10m + ).SetName("QuoteBarConsolidator"); + + yield return new TestCaseData( + new TickConsolidator(1), + new IBaseData[] + { + new Tick { Symbol = spy, Time = reference, Value = 10m, TickType = TickType.Trade }, + new Tick { Symbol = spy, Time = reference.AddMinutes(1), Value = 20m, TickType = TickType.Trade } + }, + 20m, 10m + ).SetName("TickConsolidator"); + + yield return new TestCaseData( + new TickQuoteBarConsolidator(1), + new IBaseData[] + { + new Tick { Symbol = spy, Time = reference, Value = 10m, TickType = TickType.Quote, BidPrice = 10m, AskPrice = 10m }, + new Tick { Symbol = spy, Time = reference.AddMinutes(1), Value = 20m, TickType = TickType.Quote, BidPrice = 20m, AskPrice = 20m } + }, + 20m, 10m + ).SetName("TickQuoteBarConsolidator"); + + yield return new TestCaseData( + new BaseDataConsolidator(1), + new IBaseData[] + { + new TradeBar { Symbol = spy, Time = reference, Close = 10m, Value = 10m, Period = Time.OneMinute }, + new TradeBar { Symbol = spy, Time = reference.AddMinutes(1), Close = 20m, Value = 20m, Period = Time.OneMinute } + }, + 20m, 10m + ).SetName("BaseDataConsolidator"); + + yield return new TestCaseData( + new IdentityDataConsolidator(), + new IBaseData[] + { + new TradeBar { Symbol = spy, Time = reference, Close = 10m, Value = 10m, Period = Time.OneMinute }, + new TradeBar { Symbol = spy, Time = reference.AddMinutes(1), Close = 20m, Value = 20m, Period = Time.OneMinute } + }, + 20m, 10m + ).SetName("IdentityDataConsolidator"); + + yield return new TestCaseData( + new ClassicRenkoConsolidator(10), + new IBaseData[] + { + new IndicatorDataPoint(spy, reference, 0m), + new IndicatorDataPoint(spy, reference.AddMinutes(1), 10m), + new IndicatorDataPoint(spy, reference.AddMinutes(2), 20m) + }, + 20m, 10m + ).SetName("ClassicRenkoConsolidator"); + + yield return new TestCaseData( + new RenkoConsolidator(1m), + new IBaseData[] + { + new IndicatorDataPoint(spy, reference, 10m), + new IndicatorDataPoint(spy, reference.AddMinutes(1), 12.1m) + }, + 12m, 11m + ).SetName("RenkoConsolidator"); + + yield return new TestCaseData( + new RangeConsolidator(100, x => x.Value, x => 0m), + new IBaseData[] + { + new IndicatorDataPoint(ibm, reference, 90m), + new IndicatorDataPoint(ibm, reference.AddMinutes(1), 94.5m) + }, + 94.03m, 93.02m + ).SetName("RangeConsolidator"); + + yield return new TestCaseData( + new SequentialConsolidator(new TradeBarConsolidator(1), new TradeBarConsolidator(1)), + new IBaseData[] + { + new TradeBar { Symbol = spy, Time = reference, Close = 10m, Value = 10m, Period = Time.OneMinute }, + new TradeBar { Symbol = spy, Time = reference.AddMinutes(1), Close = 20m, Value = 20m, Period = Time.OneMinute } + }, + 20m, 10m + ).SetName("SequentialConsolidator"); + } + } +} diff --git a/Tests/Common/Data/MarketHourAwareConsolidatorTests.cs b/Tests/Common/Data/MarketHourAwareConsolidatorTests.cs index 3ad59c56396b..030c1cdb0434 100644 --- a/Tests/Common/Data/MarketHourAwareConsolidatorTests.cs +++ b/Tests/Common/Data/MarketHourAwareConsolidatorTests.cs @@ -294,6 +294,25 @@ public void WorksWithDailyResolutionAndPreciseEndTimeFalse() Assert.AreEqual(100, consolidatedData.High); } + [Test] + public void WindowIsPopulatedOnConsolidation() + { + var symbol = Symbols.SPY; + using var consolidator = new MarketHourAwareConsolidator(false, Resolution.Daily, typeof(TradeBar), TickType.Trade, false); + + consolidator.Update(new TradeBar() { Time = new DateTime(2015, 04, 13, 12, 0, 0), Period = Time.OneMinute, Symbol = symbol, Close = 100 }); + consolidator.Scan(new DateTime(2015, 04, 14, 0, 0, 0)); + + Assert.AreEqual(1, consolidator.Window.Count); + + consolidator.Update(new TradeBar() { Time = new DateTime(2015, 04, 14, 12, 0, 0), Period = Time.OneMinute, Symbol = symbol, Close = 200 }); + consolidator.Scan(new DateTime(2015, 04, 15, 0, 0, 0)); + + Assert.AreEqual(2, consolidator.Window.Count); + Assert.AreEqual(200, ((TradeBar)consolidator.Window[0]).Close); + Assert.AreEqual(100, ((TradeBar)consolidator.Window[1]).Close); + } + protected override IDataConsolidator CreateConsolidator() { return new MarketHourAwareConsolidator(true, Resolution.Hour, typeof(TradeBar), TickType.Trade, false); From d5bed7dd33de097aa42b06e3da3efe96f1b91df0 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Tue, 7 Apr 2026 03:22:17 -0500 Subject: [PATCH 04/10] Add Current and Previous properties to consolidators --- .../ConsolidatorRollingWindowRegressionAlgorithm.cs | 9 +++++++++ .../ConsolidatorRollingWindowRegressionAlgorithm.py | 5 +++++ Common/Data/Consolidators/ConsolidatorBase.cs | 10 ++++++++++ Common/Python/DataConsolidatorPythonWrapper.cs | 10 ++++++++++ 4 files changed, 34 insertions(+) diff --git a/Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs b/Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs index 3359db0d41e8..103c41a2718e 100644 --- a/Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs +++ b/Algorithm.CSharp/ConsolidatorRollingWindowRegressionAlgorithm.cs @@ -49,6 +49,11 @@ private void OnDataConsolidated(object sender, TradeBar bar) { _consolidationCount++; + if (_consolidator.Current != _consolidator[0]) + { + throw new RegressionTestException("Expected Current to be the same as Window[0]"); + } + // Window[0] must always be the bar just consolidated var currentBar = (TradeBar)_consolidator[0]; if (currentBar.Time != bar.Time) @@ -64,6 +69,10 @@ private void OnDataConsolidated(object sender, TradeBar bar) if (_consolidator.Window.Count >= 2) { var previous = (TradeBar)_consolidator[1]; + if (_consolidator.Previous != _consolidator[1]) + { + throw new RegressionTestException("Expected Previous to be the same as Window[1]"); + } if (previous.Time >= bar.Time) { throw new RegressionTestException($"consolidator[1].Time ({previous.Time}) should be earlier than consolidator[0].Time ({bar.Time})"); diff --git a/Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py b/Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py index 087321235b8e..1b4b4de69d1e 100644 --- a/Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py +++ b/Algorithm.Python/ConsolidatorRollingWindowRegressionAlgorithm.py @@ -32,6 +32,9 @@ def initialize(self): def _on_data_consolidated(self, sender, bar): self._consolidation_count += 1 + if self._consolidator.current != self._consolidator[0]: + raise AssertionError("Expected current to be the same as window[0]") + # consolidator[0] must always match the bar just fired currentBar = self._consolidator[0] if currentBar.time != bar.time: @@ -42,6 +45,8 @@ def _on_data_consolidated(self, sender, bar): # After the second consolidation the previous bar must be at index 1 if self._consolidator.window.count >= 2: previous = self._consolidator[1] + if self._consolidator.previous != self._consolidator[1]: + raise AssertionError("Expected previous to be the same as window[1]") if previous.time >= bar.time: raise AssertionError( f"consolidator[1].time ({previous.time}) should be earlier " diff --git a/Common/Data/Consolidators/ConsolidatorBase.cs b/Common/Data/Consolidators/ConsolidatorBase.cs index 2a219eee4a36..5795b127cd52 100644 --- a/Common/Data/Consolidators/ConsolidatorBase.cs +++ b/Common/Data/Consolidators/ConsolidatorBase.cs @@ -53,6 +53,16 @@ public RollingWindow Window /// public IBaseData Consolidated { get; protected set; } + /// + /// Gets the most recently consolidated piece of data. Alias of . + /// + public IBaseData Current => Consolidated; + + /// + /// Gets the previously consolidated piece of data, or null if fewer than two bars have been produced. + /// + public IBaseData Previous => Window.Count > 1 ? Window[1] : null; + /// /// Indexes the history window, where index 0 is the most recently consolidated bar. /// diff --git a/Common/Python/DataConsolidatorPythonWrapper.cs b/Common/Python/DataConsolidatorPythonWrapper.cs index 9ce6c9912381..b0622d6a12dd 100644 --- a/Common/Python/DataConsolidatorPythonWrapper.cs +++ b/Common/Python/DataConsolidatorPythonWrapper.cs @@ -59,6 +59,16 @@ public IBaseData Consolidated get { return GetProperty(nameof(Consolidated)); } } + /// + /// Gets the most recently consolidated piece of data. Alias of . + /// + public IBaseData Current => Consolidated; + + /// + /// Gets the previously consolidated piece of data, or null if fewer than two bars have been produced. + /// + public IBaseData Previous => Window.Count > 1 ? Window[1] : null; + /// /// Gets a clone of the data being currently consolidated /// From 30ec288d81f38113fd1f200595cb3cebfa5d5690 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Tue, 7 Apr 2026 11:53:48 -0500 Subject: [PATCH 05/10] Add WindowBase as single source of truth for rolling window logic --- Common/Data/Consolidators/ConsolidatorBase.cs | 57 +----------- .../Python/DataConsolidatorPythonWrapper.cs | 2 +- Common/WindowBase.cs | 88 +++++++++++++++++++ Indicators/IndicatorBase.cs | 66 +------------- 4 files changed, 93 insertions(+), 120 deletions(-) create mode 100644 Common/WindowBase.cs diff --git a/Common/Data/Consolidators/ConsolidatorBase.cs b/Common/Data/Consolidators/ConsolidatorBase.cs index 5795b127cd52..1a9470524e8b 100644 --- a/Common/Data/Consolidators/ConsolidatorBase.cs +++ b/Common/Data/Consolidators/ConsolidatorBase.cs @@ -13,73 +13,20 @@ * limitations under the License. */ -using System.Collections; -using System.Collections.Generic; -using QuantConnect.Indicators; - namespace QuantConnect.Data.Consolidators { /// /// Provides a base implementation for consolidators, including a built-in rolling window /// that stores the history of consolidated bars. /// - public abstract class ConsolidatorBase : IEnumerable + public abstract class ConsolidatorBase : WindowBase { - /// - /// The default number of consolidated bars to keep in the rolling window history - /// - public static int DefaultWindowSize { get; } = 2; - - private RollingWindow _window; - - /// - /// A rolling window keeping a history of the consolidated bars. The most recent bar is at index 0. - /// - public RollingWindow Window - { - get - { - if (_window == null) - { - _window = new RollingWindow(DefaultWindowSize); - } - return _window; - } - } - /// /// Gets the most recently consolidated piece of data. This will be null if this consolidator /// has not produced any data yet. /// public IBaseData Consolidated { get; protected set; } - /// - /// Gets the most recently consolidated piece of data. Alias of . - /// - public IBaseData Current => Consolidated; - - /// - /// Gets the previously consolidated piece of data, or null if fewer than two bars have been produced. - /// - public IBaseData Previous => Window.Count > 1 ? Window[1] : null; - - /// - /// Indexes the history window, where index 0 is the most recently consolidated bar. - /// - /// The index - /// The ith most recently consolidated bar - public IBaseData this[int i] => Window[i]; - - /// - /// Returns an enumerator that iterates through the history window. - /// - public IEnumerator GetEnumerator() => Window.GetEnumerator(); - - /// - /// Returns an enumerator that iterates through the history window. - /// - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - /// /// Updates and adds the bar to the rolling window. /// @@ -95,7 +42,7 @@ protected void UpdateConsolidated(IBaseData consolidated) public virtual void Reset() { Consolidated = null; - _window?.Reset(); + ResetWindow(); } } } diff --git a/Common/Python/DataConsolidatorPythonWrapper.cs b/Common/Python/DataConsolidatorPythonWrapper.cs index b0622d6a12dd..94d302cf0126 100644 --- a/Common/Python/DataConsolidatorPythonWrapper.cs +++ b/Common/Python/DataConsolidatorPythonWrapper.cs @@ -33,7 +33,7 @@ public class DataConsolidatorPythonWrapper : BasePythonWrapper /// A rolling window keeping a history of the consolidated bars. The most recent bar is at index 0. /// - public RollingWindow Window { get; } = new RollingWindow(ConsolidatorBase.DefaultWindowSize); + public RollingWindow Window { get; } = new RollingWindow(WindowBase.DefaultWindowSize); /// /// Indexes the history window, where index 0 is the most recently consolidated bar. diff --git a/Common/WindowBase.cs b/Common/WindowBase.cs new file mode 100644 index 000000000000..ff363ece613b --- /dev/null +++ b/Common/WindowBase.cs @@ -0,0 +1,88 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using System.Collections; +using System.Collections.Generic; +using QuantConnect.Indicators; + +namespace QuantConnect +{ + /// + /// Provides a base class for types that maintain a rolling window history of values. + /// This is the single source of truth for window logic shared between indicators and consolidators. + /// + /// The type of value stored in the rolling window + public abstract class WindowBase : IEnumerable + { + private RollingWindow _window; + + /// + /// The default number of values to keep in the rolling window history + /// + public static int DefaultWindowSize { get; } = 2; + + /// + /// A rolling window keeping a history of values. The most recent value is at index 0. + /// Uses lazy initialization to survive Python subclasses that do not call base constructors. + /// + public RollingWindow Window => _window ??= new RollingWindow(DefaultWindowSize); + + /// + /// Gets the most recent value. The protected setter adds the value to the rolling window. + /// + public virtual T Current + { + get + { + return Window[0]; + } + protected set + { + Window.Add(value); + } + } + + /// + /// Gets the previous value, or default if fewer than two values have been produced. + /// + public virtual T Previous => Window.Count > 1 ? Window[1] : default; + + /// + /// Indexes the history window, where index 0 is the most recent value. + /// + /// The index + /// The ith most recent value + public T this[int i] => Window[i]; + + /// + /// Returns an enumerator that iterates through the history window. + /// + public IEnumerator GetEnumerator() => Window.GetEnumerator(); + + /// + /// Returns an enumerator that iterates through the history window. + /// + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + /// Resets the rolling window, clearing all stored values without allocating a new window + /// if it has not yet been created. + /// + protected void ResetWindow() + { + _window?.Reset(); + } + } +} diff --git a/Indicators/IndicatorBase.cs b/Indicators/IndicatorBase.cs index d601d4228873..634b23feb839 100644 --- a/Indicators/IndicatorBase.cs +++ b/Indicators/IndicatorBase.cs @@ -19,14 +19,13 @@ using QuantConnect.Logging; using System.Collections.Generic; using QuantConnect.Data.Consolidators; -using System.Collections; namespace QuantConnect.Indicators { /// /// Abstract Indicator base, meant to contain non-generic fields of indicator base to support non-typed inputs /// - public abstract partial class IndicatorBase : IIndicator, IEnumerable + public abstract partial class IndicatorBase : WindowBase, IIndicator { /// /// The data consolidators associated with this indicator if any @@ -35,27 +34,11 @@ public abstract partial class IndicatorBase : IIndicator, IEnumerable public ISet Consolidators { get; } = new HashSet(); - /// - /// Gets the current state of this indicator. If the state has not been updated - /// then the time on the value will equal DateTime.MinValue. - /// - public IndicatorDataPoint Current - { - get - { - return Window[0]; - } - protected set - { - Window.Add(value); - } - } - /// /// Gets the previous state of this indicator. If the state has not been updated /// then the time on the value will equal DateTime.MinValue. /// - public IndicatorDataPoint Previous + public override IndicatorDataPoint Previous { get { @@ -83,11 +66,6 @@ public IndicatorDataPoint Previous /// public event IndicatorUpdatedHandler Updated; - /// - /// A rolling window keeping a history of the indicator values of a given period - /// - public RollingWindow Window { get; } - /// /// Resets this indicator to its initial state /// @@ -98,7 +76,6 @@ public IndicatorDataPoint Previous /// protected IndicatorBase() { - Window = new RollingWindow(Indicator.DefaultWindowSize); Current = new IndicatorDataPoint(DateTime.MinValue, 0m); } @@ -129,45 +106,6 @@ protected virtual void OnUpdated(IndicatorDataPoint consolidated) /// True if this indicator is ready, false otherwise public abstract bool Update(IBaseData input); - /// - /// Indexes the history windows, where index 0 is the most recent indicator value. - /// If index is greater or equal than the current count, it returns null. - /// If the index is greater or equal than the window size, it returns null and resizes the windows to i + 1. - /// - /// The index - /// the ith most recent indicator value - public IndicatorDataPoint this[int i] - { - get - { - return Window[i]; - } - } - - /// - /// Returns an enumerator that iterates through the history window. - /// - /// - /// A that can be used to iterate through the history window. - /// - /// 1 - public IEnumerator GetEnumerator() - { - return Window.GetEnumerator(); - } - - /// - /// Returns an enumerator that iterates through the history window. - /// - /// - /// An object that can be used to iterate through the history window. - /// - /// 2 - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - /// /// ToString Overload for Indicator Base /// From 4e4eb630e1c638ce088d4918d8619c14aa5699aa Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Fri, 17 Apr 2026 15:48:09 -0500 Subject: [PATCH 06/10] Address review comments on consolidator rolling window --- .../Consolidators/BaseTimelessConsolidator.cs | 2 +- Common/Data/Consolidators/ConsolidatorBase.cs | 20 ++--- Common/Data/Consolidators/DataConsolidator.cs | 4 +- .../MarketHourAwareConsolidator.cs | 2 +- .../Data/Consolidators/RenkoConsolidator.cs | 2 +- .../Consolidators/SequentialConsolidator.cs | 2 +- .../Python/DataConsolidatorPythonWrapper.cs | 80 +++++-------------- Common/WindowBase.cs | 13 +++ Indicators/IndicatorBase.cs | 2 +- Tests/Common/Data/ConsolidatorBaseTests.cs | 3 + .../DataConsolidatorPythonWrapperTests.cs | 79 ++++++++++++++++++ 11 files changed, 132 insertions(+), 77 deletions(-) diff --git a/Common/Data/Consolidators/BaseTimelessConsolidator.cs b/Common/Data/Consolidators/BaseTimelessConsolidator.cs index 7498d0aed3e3..a61f4a0deb33 100644 --- a/Common/Data/Consolidators/BaseTimelessConsolidator.cs +++ b/Common/Data/Consolidators/BaseTimelessConsolidator.cs @@ -182,7 +182,7 @@ protected void OnDataConsolidated(T consolidated) DataConsolidatedHandler?.Invoke(this, consolidated); - UpdateConsolidated(consolidated); + Consolidated = consolidated; } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. diff --git a/Common/Data/Consolidators/ConsolidatorBase.cs b/Common/Data/Consolidators/ConsolidatorBase.cs index 1a9470524e8b..e804a22ac931 100644 --- a/Common/Data/Consolidators/ConsolidatorBase.cs +++ b/Common/Data/Consolidators/ConsolidatorBase.cs @@ -23,17 +23,18 @@ public abstract class ConsolidatorBase : WindowBase { /// /// Gets the most recently consolidated piece of data. This will be null if this consolidator - /// has not produced any data yet. + /// has not produced any data yet. Setting this property adds the value to the rolling window. /// - public IBaseData Consolidated { get; protected set; } - - /// - /// Updates and adds the bar to the rolling window. - /// - protected void UpdateConsolidated(IBaseData consolidated) + public IBaseData Consolidated { - Consolidated = consolidated; - Window.Add(consolidated); + get + { + return Window.Count > 0 ? Window[0] : null; + } + protected set + { + Window.Add(value); + } } /// @@ -41,7 +42,6 @@ protected void UpdateConsolidated(IBaseData consolidated) /// public virtual void Reset() { - Consolidated = null; ResetWindow(); } } diff --git a/Common/Data/Consolidators/DataConsolidator.cs b/Common/Data/Consolidators/DataConsolidator.cs index 1146449fefd5..04d97b76cbde 100644 --- a/Common/Data/Consolidators/DataConsolidator.cs +++ b/Common/Data/Consolidators/DataConsolidator.cs @@ -93,9 +93,9 @@ protected virtual void OnDataConsolidated(IBaseData consolidated) var handler = DataConsolidated; if (handler != null) handler(this, consolidated); - // assign Consolidated and push to Window after the event handlers fire, + // assign Consolidated (and push to Window) after the event handlers fire, // so handlers can compare the new bar against the previous one without extra bookkeeping - UpdateConsolidated(consolidated); + Consolidated = consolidated; } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. diff --git a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs index 3aaad931b889..17cb46ccf7c6 100644 --- a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs +++ b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs @@ -209,7 +209,7 @@ protected virtual bool UseStrictEndTime(Symbol symbol) protected virtual void ForwardConsolidatedBar(object sender, IBaseData consolidated) { DataConsolidated?.Invoke(this, consolidated); - UpdateConsolidated(consolidated); + Consolidated = consolidated; } } } diff --git a/Common/Data/Consolidators/RenkoConsolidator.cs b/Common/Data/Consolidators/RenkoConsolidator.cs index 652290804632..68e06012a808 100644 --- a/Common/Data/Consolidators/RenkoConsolidator.cs +++ b/Common/Data/Consolidators/RenkoConsolidator.cs @@ -257,7 +257,7 @@ protected void OnDataConsolidated(RenkoBar consolidated) DataConsolidated?.Invoke(this, consolidated); _currentBar = consolidated; _dataConsolidatedHandler?.Invoke(this, consolidated); - UpdateConsolidated(consolidated); + Consolidated = consolidated; } private void Rising(IBaseData data) diff --git a/Common/Data/Consolidators/SequentialConsolidator.cs b/Common/Data/Consolidators/SequentialConsolidator.cs index a9643549257e..af742febc7da 100644 --- a/Common/Data/Consolidators/SequentialConsolidator.cs +++ b/Common/Data/Consolidators/SequentialConsolidator.cs @@ -120,7 +120,7 @@ protected virtual void OnDataConsolidated(IBaseData consolidated) { var handler = DataConsolidated; if (handler != null) handler(this, consolidated); - UpdateConsolidated(consolidated); + Consolidated = consolidated; } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. diff --git a/Common/Python/DataConsolidatorPythonWrapper.cs b/Common/Python/DataConsolidatorPythonWrapper.cs index 94d302cf0126..85b7c16c8367 100644 --- a/Common/Python/DataConsolidatorPythonWrapper.cs +++ b/Common/Python/DataConsolidatorPythonWrapper.cs @@ -14,67 +14,25 @@ */ using System; -using System.Collections; -using System.Collections.Generic; using Python.Runtime; using QuantConnect.Data; using QuantConnect.Data.Consolidators; -using QuantConnect.Indicators; namespace QuantConnect.Python { /// /// Provides an Data Consolidator that wraps a object that represents a custom Python consolidator /// - public class DataConsolidatorPythonWrapper : BasePythonWrapper, IDataConsolidator, IEnumerable + public class DataConsolidatorPythonWrapper : ConsolidatorBase, IDataConsolidator { - internal PyObject Model => Instance; - - /// - /// A rolling window keeping a history of the consolidated bars. The most recent bar is at index 0. - /// - public RollingWindow Window { get; } = new RollingWindow(WindowBase.DefaultWindowSize); - - /// - /// Indexes the history window, where index 0 is the most recently consolidated bar. - /// - public IBaseData this[int i] => Window[i]; - - /// - /// Returns an enumerator that iterates through the history window. - /// - public IEnumerator GetEnumerator() => Window.GetEnumerator(); - - /// - /// Returns an enumerator that iterates through the history window. - /// - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - - /// - /// Gets the most recently consolidated piece of data. This will be null if this consolidator - /// has not produced any data yet. - /// - public IBaseData Consolidated - { - get { return GetProperty(nameof(Consolidated)); } - } - - /// - /// Gets the most recently consolidated piece of data. Alias of . - /// - public IBaseData Current => Consolidated; - - /// - /// Gets the previously consolidated piece of data, or null if fewer than two bars have been produced. - /// - public IBaseData Previous => Window.Count > 1 ? Window[1] : null; + private readonly BasePythonWrapper _pythonWrapper; /// /// Gets a clone of the data being currently consolidated /// public IBaseData WorkingData { - get { return GetProperty(nameof(WorkingData)); } + get { return _pythonWrapper.GetProperty(nameof(WorkingData)); } } /// @@ -82,7 +40,7 @@ public IBaseData WorkingData /// public Type InputType { - get { return GetProperty(nameof(InputType)); } + get { return _pythonWrapper.GetProperty(nameof(InputType)); } } /// @@ -90,7 +48,7 @@ public Type InputType /// public Type OutputType { - get { return GetProperty(nameof(OutputType)); } + get { return _pythonWrapper.GetProperty(nameof(OutputType)); } } /// @@ -100,12 +58,12 @@ public event DataConsolidatedHandler DataConsolidated { add { - var eventHandler = GetEvent(nameof(DataConsolidated)); + var eventHandler = _pythonWrapper.GetEvent(nameof(DataConsolidated)); eventHandler += value; } remove { - var eventHandler = GetEvent(nameof(DataConsolidated)); + var eventHandler = _pythonWrapper.GetEvent(nameof(DataConsolidated)); eventHandler -= value; } } @@ -115,9 +73,9 @@ public event DataConsolidatedHandler DataConsolidated /// /// Represents a custom python consolidator public DataConsolidatorPythonWrapper(PyObject consolidator) - : base(consolidator, true) { - DataConsolidated += (_, bar) => Window.Add(bar); + _pythonWrapper = new BasePythonWrapper(consolidator, true); + DataConsolidated += (_, bar) => Consolidated = bar; } /// @@ -126,7 +84,7 @@ public DataConsolidatorPythonWrapper(PyObject consolidator) /// The current time in the local time zone (same as ) public void Scan(DateTime currentLocalTime) { - InvokeMethod(nameof(Scan), currentLocalTime); + _pythonWrapper.InvokeMethod(nameof(Scan), currentLocalTime); } /// @@ -135,22 +93,24 @@ public void Scan(DateTime currentLocalTime) /// The new data for the consolidator public void Update(IBaseData data) { - InvokeMethod(nameof(Update), data); + _pythonWrapper.InvokeMethod(nameof(Update), data); } - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// 2 - public void Dispose() + /// + /// Resets the consolidator + /// + public override void Reset() { + _pythonWrapper.InvokeMethod(nameof(Reset)); + base.Reset(); } /// - /// Resets the consolidator + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// - public void Reset() + public void Dispose() { - InvokeMethod(nameof(Reset)); - Window.Reset(); + _pythonWrapper.Dispose(); } } } diff --git a/Common/WindowBase.cs b/Common/WindowBase.cs index ff363ece613b..80f9c5a09ed5 100644 --- a/Common/WindowBase.cs +++ b/Common/WindowBase.cs @@ -33,6 +33,19 @@ public abstract class WindowBase : IEnumerable /// public static int DefaultWindowSize { get; } = 2; + /// + /// Initializes a new instance of the class. + /// + protected WindowBase() { } + + /// + /// Initializes the rolling window with the given size. + /// + protected WindowBase(int windowSize) + { + _window = new RollingWindow(windowSize); + } + /// /// A rolling window keeping a history of values. The most recent value is at index 0. /// Uses lazy initialization to survive Python subclasses that do not call base constructors. diff --git a/Indicators/IndicatorBase.cs b/Indicators/IndicatorBase.cs index 634b23feb839..73b18f50fd23 100644 --- a/Indicators/IndicatorBase.cs +++ b/Indicators/IndicatorBase.cs @@ -74,7 +74,7 @@ public override IndicatorDataPoint Previous /// /// Initializes a new instance of the Indicator class. /// - protected IndicatorBase() + protected IndicatorBase() : base(Indicator.DefaultWindowSize) { Current = new IndicatorDataPoint(DateTime.MinValue, 0m); } diff --git a/Tests/Common/Data/ConsolidatorBaseTests.cs b/Tests/Common/Data/ConsolidatorBaseTests.cs index 627596bc3018..7f0894d5c29a 100644 --- a/Tests/Common/Data/ConsolidatorBaseTests.cs +++ b/Tests/Common/Data/ConsolidatorBaseTests.cs @@ -39,6 +39,9 @@ public void WindowStoresConsolidatedBars(IDataConsolidator consolidator, IBaseDa Assert.AreEqual(2, windowConsolidator.Window.Count); Assert.AreEqual(expectedWindow0, windowConsolidator.Window[0].Value); Assert.AreEqual(expectedWindow1, windowConsolidator.Window[1].Value); + Assert.AreEqual(windowConsolidator.Window[0], windowConsolidator.Consolidated); + Assert.AreEqual(expectedWindow0, windowConsolidator[0].Value); + Assert.AreEqual(expectedWindow1, windowConsolidator.Previous.Value); consolidator.Dispose(); } diff --git a/Tests/Python/DataConsolidatorPythonWrapperTests.cs b/Tests/Python/DataConsolidatorPythonWrapperTests.cs index 91789712d556..11b66710296c 100644 --- a/Tests/Python/DataConsolidatorPythonWrapperTests.cs +++ b/Tests/Python/DataConsolidatorPythonWrapperTests.cs @@ -202,6 +202,85 @@ public void RunRegressionAlgorithm() parameter.ExpectedFinalStatus); } + [Test] + public void WindowIsPopulatedOnConsolidation() + { + using (Py.GIL()) + { + using var wrapper = CreateFedPythonWrapper(1); + Assert.AreEqual(1, wrapper.Window.Count); + Assert.IsNotNull(wrapper.Consolidated); + Assert.AreEqual(wrapper.Consolidated, wrapper[0]); + } + } + + [Test] + public void WindowKeepsPreviousConsolidatedBar() + { + using (Py.GIL()) + { + using var wrapper = CreateFedPythonWrapper(1); + var firstConsolidated = wrapper.Consolidated; + + FeedConsolidation(wrapper, 1); + + Assert.AreEqual(2, wrapper.Window.Count); + Assert.AreNotEqual(firstConsolidated, wrapper[0]); + Assert.AreEqual(firstConsolidated, wrapper[1]); + Assert.AreEqual(firstConsolidated, wrapper.Previous); + } + } + + [Test] + public void CanIterateOverConsolidatedBars() + { + using (Py.GIL()) + { + using var wrapper = CreateFedPythonWrapper(2); + var bars = wrapper.ToList(); + Assert.AreEqual(2, bars.Count); + Assert.AreEqual(wrapper[0], bars[0]); + Assert.AreEqual(wrapper[1], bars[1]); + } + } + + [Test] + public void ResetClearsWindow() + { + using (Py.GIL()) + { + using var wrapper = CreateFedPythonWrapper(1); + wrapper.Reset(); + Assert.AreEqual(0, wrapper.Window.Count); + Assert.IsNull(wrapper.Consolidated); + } + } + + private static DataConsolidatorPythonWrapper CreateFedPythonWrapper(int consolidations) + { + var module = PyModule.FromString(Guid.NewGuid().ToString(), + "from AlgorithmImports import *\n" + + "class CustomConsolidator(QuoteBarConsolidator):\n" + + " def __init__(self):\n" + + " super().__init__(timedelta(minutes=2))\n"); + + var wrapper = new DataConsolidatorPythonWrapper(module.GetAttr("CustomConsolidator").Invoke()); + FeedConsolidation(wrapper, consolidations); + return wrapper; + } + + private static void FeedConsolidation(DataConsolidatorPythonWrapper wrapper, int consolidations) + { + var offset = wrapper.Window.Count * 2; + var time = DateTime.Today; + for (var i = 0; i < consolidations; i++) + { + var bar = new QuoteBar { Time = time.AddMinutes(offset + i * 2), Symbol = Symbols.SPY, Bid = new Bar(1, 2, 0.75m, 1.25m), LastBidSize = 3, Value = 1, Period = TimeSpan.FromMinutes(1) }; + wrapper.Update(bar); + wrapper.Scan(time.AddMinutes(offset + (i + 1) * 2)); + } + } + [Test] public void AttachAndTriggerEvent() { From 1919dbc1ecc5ce2a9b40101ba9963430cf5a0e0e Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Tue, 21 Apr 2026 13:29:46 -0500 Subject: [PATCH 07/10] Fix duplicate window in consolidator wrappers --- Common/Data/Consolidators/MarketHourAwareConsolidator.cs | 7 ++++++- Common/Data/Consolidators/SequentialConsolidator.cs | 7 ++++++- Common/WindowBase.cs | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs index 17cb46ccf7c6..b9be2eb27637 100644 --- a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs +++ b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs @@ -15,6 +15,7 @@ using System; using NodaTime; +using QuantConnect.Indicators; using QuantConnect.Util; using QuantConnect.Securities; using QuantConnect.Data.Market; @@ -41,6 +42,11 @@ public class MarketHourAwareConsolidator : ConsolidatorBase, IDataConsolidator /// protected IDataConsolidator Consolidator { get; } + /// + /// Delegates the rolling window to the inner consolidator to avoid duplication. + /// + public override RollingWindow Window => ((ConsolidatorBase)Consolidator).Window; + /// /// The associated security exchange hours instance /// @@ -209,7 +215,6 @@ protected virtual bool UseStrictEndTime(Symbol symbol) protected virtual void ForwardConsolidatedBar(object sender, IBaseData consolidated) { DataConsolidated?.Invoke(this, consolidated); - Consolidated = consolidated; } } } diff --git a/Common/Data/Consolidators/SequentialConsolidator.cs b/Common/Data/Consolidators/SequentialConsolidator.cs index af742febc7da..3d0a236a8f8a 100644 --- a/Common/Data/Consolidators/SequentialConsolidator.cs +++ b/Common/Data/Consolidators/SequentialConsolidator.cs @@ -14,6 +14,7 @@ */ using System; +using QuantConnect.Indicators; namespace QuantConnect.Data.Consolidators { @@ -41,6 +42,11 @@ public IDataConsolidator Second get; private set; } + /// + /// Delegates the rolling window to the second (inner) consolidator to avoid duplication. + /// + public override RollingWindow Window => (Second as ConsolidatorBase)?.Window; + /// /// Gets a clone of the data being currently consolidated /// @@ -120,7 +126,6 @@ protected virtual void OnDataConsolidated(IBaseData consolidated) { var handler = DataConsolidated; if (handler != null) handler(this, consolidated); - Consolidated = consolidated; } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. diff --git a/Common/WindowBase.cs b/Common/WindowBase.cs index 80f9c5a09ed5..3d2b1e1cfcf6 100644 --- a/Common/WindowBase.cs +++ b/Common/WindowBase.cs @@ -50,7 +50,7 @@ protected WindowBase(int windowSize) /// A rolling window keeping a history of values. The most recent value is at index 0. /// Uses lazy initialization to survive Python subclasses that do not call base constructors. /// - public RollingWindow Window => _window ??= new RollingWindow(DefaultWindowSize); + public virtual RollingWindow Window => _window ??= new RollingWindow(DefaultWindowSize); /// /// Gets the most recent value. The protected setter adds the value to the rolling window. From 3dd885f77c82f4f30f8ea76ff5d5bc1ebe9df6b6 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Tue, 21 Apr 2026 13:39:41 -0500 Subject: [PATCH 08/10] Fix Python consolidator equality --- Common/Python/DataConsolidatorPythonWrapper.cs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/Common/Python/DataConsolidatorPythonWrapper.cs b/Common/Python/DataConsolidatorPythonWrapper.cs index 85b7c16c8367..2dae5f3e9cc8 100644 --- a/Common/Python/DataConsolidatorPythonWrapper.cs +++ b/Common/Python/DataConsolidatorPythonWrapper.cs @@ -112,5 +112,22 @@ public void Dispose() { _pythonWrapper.Dispose(); } + + /// + /// Two wrappers are equal if they wrap the same Python object reference. + /// + public override bool Equals(object obj) + { + if (obj is DataConsolidatorPythonWrapper other) + { + return _pythonWrapper.Equals(other._pythonWrapper); + } + return _pythonWrapper.Equals(obj); + } + + /// + /// Hash code based on the underlying Python object reference. + /// + public override int GetHashCode() => _pythonWrapper.GetHashCode(); } } From dee16c584b2d30b3531fa452e92870807870388b Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Thu, 23 Apr 2026 02:54:51 -0500 Subject: [PATCH 09/10] Address review comments --- Common/Data/Consolidators/MarketHourAwareConsolidator.cs | 2 +- Common/Data/Consolidators/SequentialConsolidator.cs | 6 +++++- Common/WindowBase.cs | 5 ++--- Indicators/IndicatorBase.cs | 8 +------- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs index b9be2eb27637..db081e81abdc 100644 --- a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs +++ b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs @@ -40,7 +40,7 @@ public class MarketHourAwareConsolidator : ConsolidatorBase, IDataConsolidator /// /// The consolidator instance /// - protected IDataConsolidator Consolidator { get; } + private IDataConsolidator Consolidator { get; } /// /// Delegates the rolling window to the inner consolidator to avoid duplication. diff --git a/Common/Data/Consolidators/SequentialConsolidator.cs b/Common/Data/Consolidators/SequentialConsolidator.cs index 3d0a236a8f8a..056956d48a2f 100644 --- a/Common/Data/Consolidators/SequentialConsolidator.cs +++ b/Common/Data/Consolidators/SequentialConsolidator.cs @@ -45,7 +45,7 @@ public IDataConsolidator Second /// /// Delegates the rolling window to the second (inner) consolidator to avoid duplication. /// - public override RollingWindow Window => (Second as ConsolidatorBase)?.Window; + public override RollingWindow Window => (Second as ConsolidatorBase)?.Window ?? base.Window; /// /// Gets a clone of the data being currently consolidated @@ -124,6 +124,10 @@ public SequentialConsolidator(IDataConsolidator first, IDataConsolidator second) /// The newly consolidated data protected virtual void OnDataConsolidated(IBaseData consolidated) { + if (Second is not ConsolidatorBase) + { + Consolidated = consolidated; + } var handler = DataConsolidated; if (handler != null) handler(this, consolidated); } diff --git a/Common/WindowBase.cs b/Common/WindowBase.cs index 3d2b1e1cfcf6..5aa7281fa347 100644 --- a/Common/WindowBase.cs +++ b/Common/WindowBase.cs @@ -90,12 +90,11 @@ protected set IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); /// - /// Resets the rolling window, clearing all stored values without allocating a new window - /// if it has not yet been created. + /// Resets the rolling window, clearing all stored values. /// protected void ResetWindow() { - _window?.Reset(); + Window.Reset(); } } } diff --git a/Indicators/IndicatorBase.cs b/Indicators/IndicatorBase.cs index 73b18f50fd23..f16ca6c31021 100644 --- a/Indicators/IndicatorBase.cs +++ b/Indicators/IndicatorBase.cs @@ -38,13 +38,7 @@ public abstract partial class IndicatorBase : WindowBase, II /// Gets the previous state of this indicator. If the state has not been updated /// then the time on the value will equal DateTime.MinValue. /// - public override IndicatorDataPoint Previous - { - get - { - return Window.Count > 1 ? Window[1] : new IndicatorDataPoint(DateTime.MinValue, 0); - } - } + public override IndicatorDataPoint Previous => base.Previous ?? new IndicatorDataPoint(DateTime.MinValue, 0); /// /// Gets a name for this indicator From eab819cf46005075aad15ec5fa40f57a2d1687a4 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Fri, 24 Apr 2026 02:04:18 -0500 Subject: [PATCH 10/10] Make ConsolidatorBase implements IDataConsolidator --- .../Consolidators/BaseTimelessConsolidator.cs | 37 ++++-------- Common/Data/Consolidators/ConsolidatorBase.cs | 59 ++++++++++++++++++- Common/Data/Consolidators/DataConsolidator.cs | 49 ++++++--------- .../MarketHourAwareConsolidator.cs | 27 ++++----- .../Data/Consolidators/RenkoConsolidator.cs | 31 ++++------ .../Consolidators/SequentialConsolidator.cs | 29 ++++----- .../Python/DataConsolidatorPythonWrapper.cs | 34 +++-------- 7 files changed, 126 insertions(+), 140 deletions(-) diff --git a/Common/Data/Consolidators/BaseTimelessConsolidator.cs b/Common/Data/Consolidators/BaseTimelessConsolidator.cs index a61f4a0deb33..a68aeddd03a0 100644 --- a/Common/Data/Consolidators/BaseTimelessConsolidator.cs +++ b/Common/Data/Consolidators/BaseTimelessConsolidator.cs @@ -23,7 +23,7 @@ namespace QuantConnect.Data.Consolidators /// Represents a timeless consolidator which depends on the given values. This consolidator /// is meant to consolidate data into bars that do not depend on time, e.g., RangeBar's. /// - public abstract class BaseTimelessConsolidator : ConsolidatorBase, IDataConsolidator + public abstract class BaseTimelessConsolidator : ConsolidatorBase where T : IBaseData { /// @@ -37,11 +37,6 @@ public abstract class BaseTimelessConsolidator : ConsolidatorBase, IDataConso /// protected Func VolumeSelector { get; set; } - /// - /// Event handler type for the IDataConsolidator.DataConsolidated event - /// - protected DataConsolidatedHandler DataConsolidatedHandler { get; set; } - /// /// Bar being created /// @@ -50,32 +45,23 @@ public abstract class BaseTimelessConsolidator : ConsolidatorBase, IDataConso /// /// Gets a clone of the data being currently consolidated /// - public abstract IBaseData WorkingData { get; } + public abstract override IBaseData WorkingData { get; } /// /// Gets the type consumed by this consolidator /// - public Type InputType => typeof(IBaseData); + public override Type InputType => typeof(IBaseData); /// /// Gets which is the type emitted in the event. /// - public virtual Type OutputType => typeof(T); + public override Type OutputType => typeof(T); /// - /// Event handler that fires when a new piece of data is produced + /// Typed event handler that fires when a new piece of data is produced /// public event EventHandler DataConsolidated; - /// - /// Event handler that fires when a new piece of data is produced - /// - event DataConsolidatedHandler IDataConsolidator.DataConsolidated - { - add { DataConsolidatedHandler += value; } - remove { DataConsolidatedHandler -= value; } - } - /// /// Initializes a new instance of the class. /// @@ -135,7 +121,7 @@ private static Func TryToConvertSelector(PyObject selector, /// Updates this consolidator with the specified data /// /// The new data for the consolidator - public void Update(IBaseData data) + public override void Update(IBaseData data) { var currentValue = Selector(data); var volume = VolumeSelector(data); @@ -179,18 +165,15 @@ public void Update(IBaseData data) protected void OnDataConsolidated(T consolidated) { DataConsolidated?.Invoke(this, consolidated); - - DataConsolidatedHandler?.Invoke(this, consolidated); - - Consolidated = consolidated; + base.OnDataConsolidated(consolidated); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// 2 - public virtual void Dispose() + public override void Dispose() { DataConsolidated = null; - DataConsolidatedHandler = null; + base.Dispose(); } /// @@ -206,7 +189,7 @@ public override void Reset() /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) - public void Scan(DateTime currentLocalTime) + public override void Scan(DateTime currentLocalTime) { } } diff --git a/Common/Data/Consolidators/ConsolidatorBase.cs b/Common/Data/Consolidators/ConsolidatorBase.cs index e804a22ac931..208123c2d0f2 100644 --- a/Common/Data/Consolidators/ConsolidatorBase.cs +++ b/Common/Data/Consolidators/ConsolidatorBase.cs @@ -13,14 +13,18 @@ * limitations under the License. */ +using System; + namespace QuantConnect.Data.Consolidators { /// /// Provides a base implementation for consolidators, including a built-in rolling window /// that stores the history of consolidated bars. /// - public abstract class ConsolidatorBase : WindowBase + public abstract class ConsolidatorBase : WindowBase, IDataConsolidator { + private DataConsolidatedHandler _dataConsolidated; + /// /// Gets the most recently consolidated piece of data. This will be null if this consolidator /// has not produced any data yet. Setting this property adds the value to the rolling window. @@ -37,6 +41,59 @@ protected set } } + /// + /// Gets a clone of the data being currently consolidated + /// + public abstract IBaseData WorkingData { get; } + + /// + /// Gets the type consumed by this consolidator + /// + public abstract Type InputType { get; } + + /// + /// Gets the type produced by this consolidator + /// + public abstract Type OutputType { get; } + + /// + /// Updates this consolidator with the specified data + /// + /// The new data for the consolidator + public abstract void Update(IBaseData data); + + /// + /// Scans this consolidator to see if it should emit a bar due to time passing + /// + /// The current time in the local time zone (same as ) + public abstract void Scan(DateTime currentLocalTime); + + /// + /// Event handler that fires when a new piece of data is produced + /// + event DataConsolidatedHandler IDataConsolidator.DataConsolidated + { + add { _dataConsolidated += value; } + remove { _dataConsolidated -= value; } + } + + /// + /// Event invocator for the DataConsolidated event. Fires the event and updates the rolling window. + /// + protected virtual void OnDataConsolidated(IBaseData consolidated) + { + _dataConsolidated?.Invoke(this, consolidated); + Consolidated = consolidated; + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public virtual void Dispose() + { + _dataConsolidated = null; + } + /// /// Resets this consolidator, clearing consolidated data and the rolling window. /// diff --git a/Common/Data/Consolidators/DataConsolidator.cs b/Common/Data/Consolidators/DataConsolidator.cs index 04d97b76cbde..147e9df2b112 100644 --- a/Common/Data/Consolidators/DataConsolidator.cs +++ b/Common/Data/Consolidators/DataConsolidator.cs @@ -1,4 +1,4 @@ -/* +/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * @@ -23,14 +23,14 @@ namespace QuantConnect.Data.Consolidators /// and/or aggregated data. /// /// The type consumed by the consolidator - public abstract class DataConsolidator : ConsolidatorBase, IDataConsolidator + public abstract class DataConsolidator : ConsolidatorBase where TInput : IBaseData { /// /// Updates this consolidator with the specified data /// /// The new data for the consolidator - public void Update(IBaseData data) + public override void Update(IBaseData data) { if (!(data is TInput)) { @@ -45,7 +45,7 @@ public void Update(IBaseData data) /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) - public abstract void Scan(DateTime currentLocalTime); + public abstract override void Scan(DateTime currentLocalTime); /// /// Event handler that fires when a new piece of data is produced @@ -55,26 +55,17 @@ public void Update(IBaseData data) /// /// Gets a clone of the data being currently consolidated /// - public abstract IBaseData WorkingData - { - get; - } + public abstract override IBaseData WorkingData { get; } /// /// Gets the type consumed by this consolidator /// - public Type InputType - { - get { return typeof(TInput); } - } + public override Type InputType => typeof(TInput); /// /// Gets the type produced by this consolidator /// - public abstract Type OutputType - { - get; - } + public abstract override Type OutputType { get; } /// /// Updates this consolidator with the specified data. This method is @@ -83,26 +74,20 @@ public abstract Type OutputType /// The new data for the consolidator public abstract void Update(TInput data); - /// - /// Event invocator for the DataConsolidated event. This should be invoked - /// by derived classes when they have consolidated a new piece of data. - /// - /// The newly consolidated data - protected virtual void OnDataConsolidated(IBaseData consolidated) - { - var handler = DataConsolidated; - if (handler != null) handler(this, consolidated); - - // assign Consolidated (and push to Window) after the event handlers fire, - // so handlers can compare the new bar against the previous one without extra bookkeeping - Consolidated = consolidated; - } - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// 2 - public void Dispose() + public override void Dispose() { DataConsolidated = null; } + + /// + /// Event invocator for the DataConsolidated event. Fires the event and updates the rolling window. + /// + protected override void OnDataConsolidated(IBaseData consolidated) + { + DataConsolidated?.Invoke(this, consolidated); + base.OnDataConsolidated(consolidated); + } } } \ No newline at end of file diff --git a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs index db081e81abdc..cfe6ae5108ea 100644 --- a/Common/Data/Consolidators/MarketHourAwareConsolidator.cs +++ b/Common/Data/Consolidators/MarketHourAwareConsolidator.cs @@ -26,7 +26,7 @@ namespace QuantConnect.Data.Common /// /// Consolidator for open markets bar only, extended hours bar are not consolidated. /// - public class MarketHourAwareConsolidator : ConsolidatorBase, IDataConsolidator + public class MarketHourAwareConsolidator : ConsolidatorBase { private readonly bool _dailyStrictEndTimeEnabled; private readonly bool _extendedMarketHours; @@ -40,12 +40,7 @@ public class MarketHourAwareConsolidator : ConsolidatorBase, IDataConsolidator /// /// The consolidator instance /// - private IDataConsolidator Consolidator { get; } - - /// - /// Delegates the rolling window to the inner consolidator to avoid duplication. - /// - public override RollingWindow Window => ((ConsolidatorBase)Consolidator).Window; + private ConsolidatorBase Consolidator { get; } /// /// The associated security exchange hours instance @@ -60,17 +55,17 @@ public class MarketHourAwareConsolidator : ConsolidatorBase, IDataConsolidator /// /// Gets the type consumed by this consolidator /// - public Type InputType => Consolidator.InputType; + public override Type InputType => Consolidator.InputType; /// /// Gets a clone of the data being currently consolidated /// - public IBaseData WorkingData => Consolidator.WorkingData; + public override IBaseData WorkingData => Consolidator.WorkingData; /// /// Gets the type produced by this consolidator /// - public Type OutputType => Consolidator.OutputType; + public override Type OutputType => Consolidator.OutputType; /// /// Initializes a new instance of the class. @@ -116,7 +111,7 @@ public MarketHourAwareConsolidator(bool dailyStrictEndTimeEnabled, Resolution re { throw new ArgumentNullException(nameof(dataType), $"{dataType.Name} not supported"); } - Consolidator.DataConsolidated += ForwardConsolidatedBar; + ((IDataConsolidator)Consolidator).DataConsolidated += ForwardConsolidatedBar; } /// @@ -128,7 +123,7 @@ public MarketHourAwareConsolidator(bool dailyStrictEndTimeEnabled, Resolution re /// Updates this consolidator with the specified data /// /// The new data for the consolidator - public virtual void Update(IBaseData data) + public override void Update(IBaseData data) { Initialize(data); @@ -147,7 +142,7 @@ public virtual void Update(IBaseData data) /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) - public void Scan(DateTime currentLocalTime) + public override void Scan(DateTime currentLocalTime) { Consolidator.Scan(currentLocalTime); } @@ -155,10 +150,11 @@ public void Scan(DateTime currentLocalTime) /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// - public void Dispose() + public override void Dispose() { - Consolidator.DataConsolidated -= ForwardConsolidatedBar; + ((IDataConsolidator)Consolidator).DataConsolidated -= ForwardConsolidatedBar; Consolidator.Dispose(); + base.Dispose(); } /// @@ -215,6 +211,7 @@ protected virtual bool UseStrictEndTime(Symbol symbol) protected virtual void ForwardConsolidatedBar(object sender, IBaseData consolidated) { DataConsolidated?.Invoke(this, consolidated); + base.OnDataConsolidated(consolidated); } } } diff --git a/Common/Data/Consolidators/RenkoConsolidator.cs b/Common/Data/Consolidators/RenkoConsolidator.cs index 68e06012a808..b59042290e6a 100644 --- a/Common/Data/Consolidators/RenkoConsolidator.cs +++ b/Common/Data/Consolidators/RenkoConsolidator.cs @@ -24,11 +24,10 @@ namespace QuantConnect.Data.Consolidators /// /// This implementation replaced the original implementation that was shown to have inaccuracies in its representation /// of Renko charts. The original implementation has been moved to . - public class RenkoConsolidator : ConsolidatorBase, IDataConsolidator + public class RenkoConsolidator : ConsolidatorBase { private bool _firstTick = true; private RenkoBar _lastWicko; - private DataConsolidatedHandler _dataConsolidatedHandler; private RenkoBar _currentBar; /// @@ -81,32 +80,23 @@ public class RenkoConsolidator : ConsolidatorBase, IDataConsolidator /// /// Gets a clone of the data being currently consolidated /// - public IBaseData WorkingData => _currentBar?.Clone(); + public override IBaseData WorkingData => _currentBar?.Clone(); /// /// Gets the type consumed by this consolidator /// - public Type InputType => typeof(IBaseData); + public override Type InputType => typeof(IBaseData); /// /// Gets which is the type emitted in the event. /// - public Type OutputType => typeof(RenkoBar); + public override Type OutputType => typeof(RenkoBar); /// - /// Event handler that fires when a new piece of data is produced + /// Typed event handler that fires when a new piece of data is produced /// public event EventHandler DataConsolidated; - /// - /// Event handler that fires when a new piece of data is produced - /// - event DataConsolidatedHandler IDataConsolidator.DataConsolidated - { - add { _dataConsolidatedHandler += value; } - remove { _dataConsolidatedHandler -= value; } - } - /// /// Initializes a new instance of the class using the specified . /// @@ -125,7 +115,7 @@ public RenkoConsolidator(decimal barSize) /// Updates this consolidator with the specified data /// /// The new data for the consolidator - public void Update(IBaseData data) + public override void Update(IBaseData data) { var rate = data.Price; @@ -218,16 +208,16 @@ public void Update(IBaseData data) /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) - public void Scan(DateTime currentLocalTime) + public override void Scan(DateTime currentLocalTime) { } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// 2 - public void Dispose() + public override void Dispose() { DataConsolidated = null; - _dataConsolidatedHandler = null; + base.Dispose(); } /// @@ -256,8 +246,7 @@ protected void OnDataConsolidated(RenkoBar consolidated) { DataConsolidated?.Invoke(this, consolidated); _currentBar = consolidated; - _dataConsolidatedHandler?.Invoke(this, consolidated); - Consolidated = consolidated; + base.OnDataConsolidated(consolidated); } private void Rising(IBaseData data) diff --git a/Common/Data/Consolidators/SequentialConsolidator.cs b/Common/Data/Consolidators/SequentialConsolidator.cs index 056956d48a2f..3f068f78f951 100644 --- a/Common/Data/Consolidators/SequentialConsolidator.cs +++ b/Common/Data/Consolidators/SequentialConsolidator.cs @@ -23,7 +23,7 @@ namespace QuantConnect.Data.Consolidators /// such that data flows from the First to Second consolidator. It's output comes /// from the Second. /// - public class SequentialConsolidator : ConsolidatorBase, IDataConsolidator + public class SequentialConsolidator : ConsolidatorBase { /// /// Gets the first consolidator to receive data @@ -42,15 +42,10 @@ public IDataConsolidator Second get; private set; } - /// - /// Delegates the rolling window to the second (inner) consolidator to avoid duplication. - /// - public override RollingWindow Window => (Second as ConsolidatorBase)?.Window ?? base.Window; - /// /// Gets a clone of the data being currently consolidated /// - public IBaseData WorkingData + public override IBaseData WorkingData { get { return Second.WorkingData; } } @@ -58,7 +53,7 @@ public IBaseData WorkingData /// /// Gets the type consumed by this consolidator /// - public Type InputType + public override Type InputType { get { return First.InputType; } } @@ -66,7 +61,7 @@ public Type InputType /// /// Gets the type produced by this consolidator /// - public Type OutputType + public override Type OutputType { get { return Second.OutputType; } } @@ -75,7 +70,7 @@ public Type OutputType /// Updates this consolidator with the specified data /// /// The new data for the consolidator - public void Update(IBaseData data) + public override void Update(IBaseData data) { First.Update(data); } @@ -84,7 +79,7 @@ public void Update(IBaseData data) /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) - public void Scan(DateTime currentLocalTime) + public override void Scan(DateTime currentLocalTime) { First.Scan(currentLocalTime); } @@ -122,19 +117,15 @@ public SequentialConsolidator(IDataConsolidator first, IDataConsolidator second) /// by derived classes when they have consolidated a new piece of data. /// /// The newly consolidated data - protected virtual void OnDataConsolidated(IBaseData consolidated) + protected override void OnDataConsolidated(IBaseData consolidated) { - if (Second is not ConsolidatorBase) - { - Consolidated = consolidated; - } - var handler = DataConsolidated; - if (handler != null) handler(this, consolidated); + DataConsolidated?.Invoke(this, consolidated); + base.OnDataConsolidated(consolidated); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// 2 - public void Dispose() + public override void Dispose() { First.Dispose(); Second.Dispose(); diff --git a/Common/Python/DataConsolidatorPythonWrapper.cs b/Common/Python/DataConsolidatorPythonWrapper.cs index 2dae5f3e9cc8..2ba4891b3e67 100644 --- a/Common/Python/DataConsolidatorPythonWrapper.cs +++ b/Common/Python/DataConsolidatorPythonWrapper.cs @@ -23,14 +23,14 @@ namespace QuantConnect.Python /// /// Provides an Data Consolidator that wraps a object that represents a custom Python consolidator /// - public class DataConsolidatorPythonWrapper : ConsolidatorBase, IDataConsolidator + public class DataConsolidatorPythonWrapper : ConsolidatorBase { private readonly BasePythonWrapper _pythonWrapper; /// /// Gets a clone of the data being currently consolidated /// - public IBaseData WorkingData + public override IBaseData WorkingData { get { return _pythonWrapper.GetProperty(nameof(WorkingData)); } } @@ -38,7 +38,7 @@ public IBaseData WorkingData /// /// Gets the type consumed by this consolidator /// - public Type InputType + public override Type InputType { get { return _pythonWrapper.GetProperty(nameof(InputType)); } } @@ -46,28 +46,11 @@ public Type InputType /// /// Gets the type produced by this consolidator /// - public Type OutputType + public override Type OutputType { get { return _pythonWrapper.GetProperty(nameof(OutputType)); } } - /// - /// Event handler that fires when a new piece of data is produced - /// - public event DataConsolidatedHandler DataConsolidated - { - add - { - var eventHandler = _pythonWrapper.GetEvent(nameof(DataConsolidated)); - eventHandler += value; - } - remove - { - var eventHandler = _pythonWrapper.GetEvent(nameof(DataConsolidated)); - eventHandler -= value; - } - } - /// /// Constructor for initialising the class with wrapped object /// @@ -75,14 +58,15 @@ public event DataConsolidatedHandler DataConsolidated public DataConsolidatorPythonWrapper(PyObject consolidator) { _pythonWrapper = new BasePythonWrapper(consolidator, true); - DataConsolidated += (_, bar) => Consolidated = bar; + var pythonEvent = _pythonWrapper.GetEvent("DataConsolidated"); + pythonEvent += new DataConsolidatedHandler((_, bar) => OnDataConsolidated(bar)); } /// /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) - public void Scan(DateTime currentLocalTime) + public override void Scan(DateTime currentLocalTime) { _pythonWrapper.InvokeMethod(nameof(Scan), currentLocalTime); } @@ -91,7 +75,7 @@ public void Scan(DateTime currentLocalTime) /// Updates this consolidator with the specified data /// /// The new data for the consolidator - public void Update(IBaseData data) + public override void Update(IBaseData data) { _pythonWrapper.InvokeMethod(nameof(Update), data); } @@ -108,7 +92,7 @@ public override void Reset() /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// - public void Dispose() + public override void Dispose() { _pythonWrapper.Dispose(); }