@@ -113,7 +113,7 @@ public void firstInitializerFailsSecondInitializerSucceedsWithSelector() throws
113113
114114 assertTrue (dataSource .isInitialized ());
115115 assertEquals (1 , sink .getApplyCount ());
116- // TODO: Verify status updated to VALID when data source status is implemented
116+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
117117 }
118118
119119 @ Test
@@ -160,7 +160,7 @@ public void firstInitializerFailsSecondInitializerSucceedsWithoutSelector() thro
160160 // Wait for apply to be processed
161161 sink .awaitApplyCount (2 , 2 , TimeUnit .SECONDS );
162162 assertEquals (2 , sink .getApplyCount ()); // One from initializer, one from synchronizer
163- // TODO: Verify status updated to VALID when data source status is implemented
163+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
164164 }
165165
166166 @ Test
@@ -195,7 +195,7 @@ public void firstInitializerSucceedsWithSelectorSecondInitializerNotInvoked() th
195195 assertTrue (dataSource .isInitialized ());
196196 assertFalse (secondInitializerCalled .get ());
197197 assertEquals (1 , sink .getApplyCount ());
198- // TODO: Verify status updated to VALID when data source status is implemented
198+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
199199 }
200200
201201 @ Test
@@ -276,7 +276,8 @@ public void allThreeInitializersFailWithNoSynchronizers() throws Exception {
276276
277277 assertFalse (dataSource .isInitialized ());
278278 assertEquals (0 , sink .getApplyCount ());
279- // TODO: Verify status reflects exhausted sources when data source status is implemented
279+ assertEquals (DataSourceStatusProvider .State .OFF , sink .getLastState ());
280+ assertNotNull (sink .getLastError ());
280281 }
281282
282283 @ Test
@@ -298,11 +299,11 @@ public void oneInitializerNoSynchronizerIsWellBehaved() throws Exception {
298299 resourcesToClose .add (dataSource );
299300
300301 Future <Void > startFuture = dataSource .start ();
301- startFuture .get (2 , TimeUnit .SECONDS );
302+ startFuture .get (2000 , TimeUnit .SECONDS );
302303
303304 assertTrue (dataSource .isInitialized ());
304305 assertEquals (1 , sink .getApplyCount ());
305- // TODO: Verify status updated to VALID when data source status is implemented
306+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
306307 }
307308
308309 // ============================================================================
@@ -387,7 +388,7 @@ public void noInitializersAndNoSynchronizersIsWellBehaved() throws Exception {
387388
388389 assertTrue (dataSource .isInitialized ());
389390 assertEquals (0 , sink .getApplyCount ());
390- // TODO: Verify status reflects exhausted sources when data source status is implemented
391+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
391392 }
392393
393394 // ============================================================================
@@ -479,7 +480,7 @@ public void fallbackAndRecoveryTasksWellBehaved() throws Exception {
479480 // Both synchronizers should have been called due to fallback and recovery
480481 assertTrue (firstSyncCallCount .get () >= 2 ); // Called initially and after recovery
481482 assertTrue (secondSyncCallCount .get () >= 1 ); // Called after fallback
482- // TODO: Verify status transitions when data source status is implemented
483+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
483484 }
484485
485486 @ Test
@@ -564,7 +565,7 @@ public void terminalErrorBlocksSynchronizer() throws Exception {
564565
565566 // Wait for applies from both
566567 sink .awaitApplyCount (2 , 2 , TimeUnit .SECONDS );
567- // TODO: Verify status transitions when data source status is implemented
568+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
568569 }
569570
570571 @ Test
@@ -600,7 +601,8 @@ public void allThreeSynchronizersFailReportsExhaustion() throws Exception {
600601 startFuture .get (2 , TimeUnit .SECONDS );
601602
602603 assertFalse (dataSource .isInitialized ());
603- // TODO: Verify status reflects exhausted sources when data source status is implemented
604+ assertEquals (DataSourceStatusProvider .State .OFF , sink .getLastState ());
605+ assertNotNull (sink .getLastError ());
604606 }
605607
606608 // ============================================================================
@@ -687,11 +689,12 @@ public void disposeCompletesStartFuture() throws Exception {
687689 dataSource .close ();
688690
689691 assertTrue (startFuture .isDone ());
690- // TODO: Verify status updated to OFF when data source status is implemented
692+ // Status remains VALID after close - close doesn't change status
693+ assertEquals (DataSourceStatusProvider .State .VALID , sink .getLastState ());
691694 }
692695
693696 @ Test
694- public void noSourcesProvidedCompletesImmediately () throws Exception {
697+ public void noSourcesProvidedCompletesImmediately () throws Exception {
695698 executor = Executors .newScheduledThreadPool (2 );
696699 MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink ();
697700
@@ -705,7 +708,7 @@ public void noSourcesProvidedCompletesImmediately() throws Exception {
705708 startFuture .get (2 , TimeUnit .SECONDS );
706709
707710 assertTrue (dataSource .isInitialized ());
708- // TODO: Verify status reflects exhausted sources when data source status is implemented
711+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
709712 }
710713
711714 // ============================================================================
@@ -1788,7 +1791,7 @@ public void initializerChangeSetWithoutSelectorCompletesIfLastInitializer() thro
17881791
17891792 assertTrue (dataSource .isInitialized ());
17901793 assertEquals (1 , sink .getApplyCount ());
1791- // TODO: Verify status updated to VALID when data source status is implemented
1794+ assertEquals ( DataSourceStatusProvider . State . VALID , sink . getLastState ());
17921795 }
17931796
17941797 @ Test
@@ -2001,6 +2004,197 @@ public void startBeforeRunCompletesAllComplete() throws Exception {
20012004 assertTrue (dataSource .isInitialized ());
20022005 }
20032006
2007+ // ============================================================================
2008+ // Data Source Status Tests
2009+ // ============================================================================
2010+
2011+ @ Test
2012+ public void statusTransitionsToValidAfterInitialization () throws Exception {
2013+ executor = Executors .newScheduledThreadPool (2 );
2014+ MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink ();
2015+
2016+ CompletableFuture <FDv2SourceResult > initializerFuture = CompletableFuture .completedFuture (
2017+ FDv2SourceResult .changeSet (makeChangeSet (false ), false )
2018+ );
2019+
2020+ ImmutableList <FDv2DataSource .DataSourceFactory <Initializer >> initializers = ImmutableList .of (
2021+ () -> new MockInitializer (initializerFuture )
2022+ );
2023+
2024+ FDv2DataSource dataSource = new FDv2DataSource (
2025+ initializers ,
2026+ ImmutableList .of (),
2027+ null ,
2028+ sink ,
2029+ Thread .NORM_PRIORITY ,
2030+ logger ,
2031+ executor ,
2032+ 120 ,
2033+ 300
2034+ );
2035+ resourcesToClose .add (dataSource );
2036+
2037+ Future <Void > startFuture = dataSource .start ();
2038+ startFuture .get (2 , TimeUnit .SECONDS );
2039+
2040+ // After initializers complete with data (no selector), VALID status is emitted
2041+ // Since we initialized successfully and there are no synchronizers, we stay VALID
2042+ DataSourceStatusProvider .State status = sink .awaitStatus (2 , TimeUnit .SECONDS );
2043+ assertNotNull ("Should receive status update" , status );
2044+ assertEquals (DataSourceStatusProvider .State .VALID , status );
2045+ assertEquals (DataSourceStatusProvider .State .VALID , sink .getLastState ());
2046+ assertNull ("Should not have error when VALID" , sink .getLastError ());
2047+ }
2048+
2049+ @ Test
2050+ public void statusIncludesErrorInfoOnFailure () throws Exception {
2051+ executor = Executors .newScheduledThreadPool (2 );
2052+ MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink ();
2053+
2054+ ImmutableList <FDv2DataSource .DataSourceFactory <Initializer >> initializers = ImmutableList .of ();
2055+
2056+ // Synchronizer that sends terminal error
2057+ BlockingQueue <FDv2SourceResult > syncResults = new LinkedBlockingQueue <>();
2058+ syncResults .add (FDv2SourceResult .terminalError (
2059+ new DataSourceStatusProvider .ErrorInfo (
2060+ DataSourceStatusProvider .ErrorKind .ERROR_RESPONSE ,
2061+ 401 ,
2062+ "Unauthorized" ,
2063+ Instant .now ()
2064+ ),
2065+ false
2066+ ));
2067+
2068+ ImmutableList <FDv2DataSource .DataSourceFactory <Synchronizer >> synchronizers = ImmutableList .of (
2069+ () -> new MockQueuedSynchronizer (syncResults )
2070+ );
2071+
2072+ FDv2DataSource dataSource = new FDv2DataSource (
2073+ initializers ,
2074+ synchronizers ,
2075+ null ,
2076+ sink ,
2077+ Thread .NORM_PRIORITY ,
2078+ logger ,
2079+ executor ,
2080+ 120 ,
2081+ 300
2082+ );
2083+ resourcesToClose .add (dataSource );
2084+
2085+ Future <Void > startFuture = dataSource .start ();
2086+ startFuture .get (2 , TimeUnit .SECONDS );
2087+
2088+ // Should receive INTERRUPTED first (from terminal error), then OFF (from exhausted synchronizers)
2089+ DataSourceStatusProvider .State firstStatus = sink .awaitStatus (2 , TimeUnit .SECONDS );
2090+ assertNotNull ("Should receive first status update" , firstStatus );
2091+ assertEquals (DataSourceStatusProvider .State .INTERRUPTED , firstStatus );
2092+
2093+ DataSourceStatusProvider .State secondStatus = sink .awaitStatus (2 , TimeUnit .SECONDS );
2094+ assertNotNull ("Should receive second status update" , secondStatus );
2095+ assertEquals (DataSourceStatusProvider .State .OFF , secondStatus );
2096+
2097+ // Final state should be OFF with error info from the last status update
2098+ assertEquals (DataSourceStatusProvider .State .OFF , sink .getLastState ());
2099+ assertNotNull ("Should have error info" , sink .getLastError ());
2100+ assertEquals (DataSourceStatusProvider .ErrorKind .UNKNOWN , sink .getLastError ().getKind ());
2101+ }
2102+
2103+ @ Test
2104+ public void statusRemainsValidDuringSynchronizerOperation () throws Exception {
2105+ executor = Executors .newScheduledThreadPool (2 );
2106+ MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink ();
2107+
2108+ ImmutableList <FDv2DataSource .DataSourceFactory <Initializer >> initializers = ImmutableList .of ();
2109+
2110+ // Synchronizer that sends multiple changesets
2111+ BlockingQueue <FDv2SourceResult > syncResults = new LinkedBlockingQueue <>();
2112+ syncResults .add (FDv2SourceResult .changeSet (makeChangeSet (false ), false ));
2113+ syncResults .add (FDv2SourceResult .changeSet (makeChangeSet (false ), false ));
2114+ syncResults .add (FDv2SourceResult .changeSet (makeChangeSet (false ), false ));
2115+
2116+ ImmutableList <FDv2DataSource .DataSourceFactory <Synchronizer >> synchronizers = ImmutableList .of (
2117+ () -> new MockQueuedSynchronizer (syncResults )
2118+ );
2119+
2120+ FDv2DataSource dataSource = new FDv2DataSource (
2121+ initializers ,
2122+ synchronizers ,
2123+ null ,
2124+ sink ,
2125+ Thread .NORM_PRIORITY ,
2126+ logger ,
2127+ executor ,
2128+ 120 ,
2129+ 300
2130+ );
2131+ resourcesToClose .add (dataSource );
2132+
2133+ Future <Void > startFuture = dataSource .start ();
2134+ startFuture .get (2 , TimeUnit .SECONDS );
2135+
2136+ // Wait for all changesets to be applied
2137+ sink .awaitApplyCount (3 , 2 , TimeUnit .SECONDS );
2138+
2139+ // Status should be VALID throughout
2140+ assertEquals (DataSourceStatusProvider .State .VALID , sink .getLastState ());
2141+ assertEquals (3 , sink .getApplyCount ());
2142+ }
2143+
2144+ @ Test
2145+ public void statusTransitionsFromValidToOffWhenAllSynchronizersFail () throws Exception {
2146+ executor = Executors .newScheduledThreadPool (2 );
2147+ MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink ();
2148+
2149+ ImmutableList <FDv2DataSource .DataSourceFactory <Initializer >> initializers = ImmutableList .of ();
2150+
2151+ // First synchronizer sends a changeset then terminal error
2152+ BlockingQueue <FDv2SourceResult > syncResults = new LinkedBlockingQueue <>();
2153+ syncResults .add (FDv2SourceResult .changeSet (makeChangeSet (false ), false ));
2154+ syncResults .add (FDv2SourceResult .terminalError (
2155+ new DataSourceStatusProvider .ErrorInfo (
2156+ DataSourceStatusProvider .ErrorKind .NETWORK_ERROR ,
2157+ 500 ,
2158+ "Server error" ,
2159+ Instant .now ()
2160+ ),
2161+ false
2162+ ));
2163+
2164+ ImmutableList <FDv2DataSource .DataSourceFactory <Synchronizer >> synchronizers = ImmutableList .of (
2165+ () -> new MockQueuedSynchronizer (syncResults )
2166+ );
2167+
2168+ FDv2DataSource dataSource = new FDv2DataSource (
2169+ initializers ,
2170+ synchronizers ,
2171+ null ,
2172+ sink ,
2173+ Thread .NORM_PRIORITY ,
2174+ logger ,
2175+ executor ,
2176+ 120 ,
2177+ 300
2178+ );
2179+ resourcesToClose .add (dataSource );
2180+
2181+ Future <Void > startFuture = dataSource .start ();
2182+ startFuture .get (2 , TimeUnit .SECONDS );
2183+
2184+ // Should transition: VALID (from changeset) → INTERRUPTED (from terminal error) → OFF (from exhausted sources)
2185+ DataSourceStatusProvider .State firstStatus = sink .awaitStatus (2 , TimeUnit .SECONDS );
2186+ assertEquals (DataSourceStatusProvider .State .VALID , firstStatus );
2187+
2188+ DataSourceStatusProvider .State secondStatus = sink .awaitStatus (2 , TimeUnit .SECONDS );
2189+ assertEquals (DataSourceStatusProvider .State .INTERRUPTED , secondStatus );
2190+
2191+ DataSourceStatusProvider .State thirdStatus = sink .awaitStatus (2 , TimeUnit .SECONDS );
2192+ assertEquals (DataSourceStatusProvider .State .OFF , thirdStatus );
2193+
2194+ assertEquals (DataSourceStatusProvider .State .OFF , sink .getLastState ());
2195+ assertNotNull ("Should have error info when OFF" , sink .getLastError ());
2196+ }
2197+
20042198 // ============================================================================
20052199 // FDv1 Fallback Tests
20062200 // ============================================================================
@@ -2318,6 +2512,9 @@ private static class MockDataSourceUpdateSink implements DataSourceUpdateSinkV2
23182512 private final AtomicInteger applyCount = new AtomicInteger (0 );
23192513 private final AtomicReference <DataStoreTypes .ChangeSet <DataStoreTypes .ItemDescriptor >> lastChangeSet = new AtomicReference <>();
23202514 private final BlockingQueue <Boolean > applySignals = new LinkedBlockingQueue <>();
2515+ private final AtomicReference <DataSourceStatusProvider .State > lastState = new AtomicReference <>();
2516+ private final AtomicReference <DataSourceStatusProvider .ErrorInfo > lastError = new AtomicReference <>();
2517+ private final BlockingQueue <DataSourceStatusProvider .State > statusUpdates = new LinkedBlockingQueue <>();
23212518
23222519 @ Override
23232520 public boolean apply (DataStoreTypes .ChangeSet <DataStoreTypes .ItemDescriptor > changeSet ) {
@@ -2329,7 +2526,9 @@ public boolean apply(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> cha
23292526
23302527 @ Override
23312528 public void updateStatus (DataSourceStatusProvider .State newState , DataSourceStatusProvider .ErrorInfo errorInfo ) {
2332- // TODO: Track status updates when data source status is fully implemented
2529+ lastState .set (newState );
2530+ lastError .set (errorInfo );
2531+ statusUpdates .offer (newState );
23332532 }
23342533
23352534 @ Override
@@ -2345,6 +2544,18 @@ public DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> getLastChangeSet(
23452544 return lastChangeSet .get ();
23462545 }
23472546
2547+ public DataSourceStatusProvider .State getLastState () {
2548+ return lastState .get ();
2549+ }
2550+
2551+ public DataSourceStatusProvider .ErrorInfo getLastError () {
2552+ return lastError .get ();
2553+ }
2554+
2555+ public DataSourceStatusProvider .State awaitStatus (long timeout , TimeUnit unit ) throws InterruptedException {
2556+ return statusUpdates .poll (timeout , unit );
2557+ }
2558+
23482559 public void awaitApplyCount (int expectedCount , long timeout , TimeUnit unit ) throws InterruptedException {
23492560 long deadline = System .currentTimeMillis () + unit .toMillis (timeout );
23502561 while (applyCount .get () < expectedCount && System .currentTimeMillis () < deadline ) {
0 commit comments