Skip to content

Commit 7a35888

Browse files
sunitaprajapati89abueideclaude
authored
feat: Waiting plugin addition (#1110)
* feat: Add WaitingPlugin support for pausing event processing Implements WaitingPlugin functionality that allows plugins to pause event processing until async operations complete (e.g., permissions, SDK initialization). Key features: - WaitingPlugin base class with pause/resume methods - Automatic event buffering when paused - Support for multiple waiting plugins - 30-second timeout to prevent permanent blocking - Works with both root-level and destination plugins Implementation details: - Add running state to storage (separate from enabled) - Events queued when running=false - Process pending events when all waiting plugins resume - QueueFlushingPlugin respects running state Includes comprehensive tests and documentation with real-world examples (IDFA permissions, native SDK initialization, remote config loading). Co-Authored-By: Claude <[email protected]> * format * fix: Resolve linting errors in WaitingPlugin - Fix strict-boolean-expressions error in QueueFlushingPlugin.ts - Replace 'any' type with proper ClientWithInternals type in tests - All linting errors resolved * address wenxi's comments * fix: Cap pending events queue and add WaitingPlugin test coverage Fix inverted condition in process() that was routing events to the pipeline when paused and queuing them when running. Add 1000-event cap on pendingEvents to match Kotlin SDK's StartupQueue behavior. Add tests for event queuing, replay on resume, flush-while-paused, timeout cancellation, resume idempotency, plugin removal, and cap. Co-Authored-By: Claude Opus 4.6 <[email protected]> * format --------- Co-authored-by: Andrea Bueide <[email protected]> Co-authored-by: Claude <[email protected]>
1 parent 3f8c203 commit 7a35888

File tree

10 files changed

+951
-7
lines changed

10 files changed

+951
-7
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import {WaitingPlugin, PluginType} from '@segment/analytics-react-native';
2+
3+
import type {
4+
SegmentAPISettings,
5+
SegmentEvent,
6+
UpdateType,
7+
} from '@segment/analytics-react-native';
8+
9+
/**
10+
* Example WaitingPlugin that demonstrates how to pause event processing
11+
* until an async operation completes.
12+
*
13+
* Use cases:
14+
* - Waiting for IDFA/advertising ID permissions
15+
* - Initializing native SDKs or modules
16+
* - Loading required configuration from remote sources
17+
*
18+
* The plugin automatically pauses event processing when added to the client.
19+
* Call resume() when your async operation completes to start processing events.
20+
*/
21+
export class ExampleWaitingPlugin extends WaitingPlugin {
22+
type = PluginType.enrichment;
23+
tracked = false;
24+
25+
/**
26+
* Called when settings are updated from Segment.
27+
* For initial settings, we simulate an async operation and then resume.
28+
*/
29+
update(_settings: SegmentAPISettings, type: UpdateType) {
30+
if (type === UpdateType.initial) {
31+
// Simulate async work (e.g., requesting permissions, loading data)
32+
setTimeout(() => {
33+
// Resume event processing once async work is complete
34+
this.resume();
35+
}, 3000);
36+
}
37+
}
38+
39+
/**
40+
* Called for track events
41+
*/
42+
track(event: SegmentEvent) {
43+
this.tracked = true;
44+
return event;
45+
}
46+
}

packages/core/src/analytics.ts

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
workspaceDestinationFilterKey,
1111
defaultFlushInterval,
1212
defaultFlushAt,
13+
maxPendingEvents,
1314
} from './constants';
1415
import { getContext } from './context';
1516
import {
@@ -72,6 +73,7 @@ import {
7273
translateHTTPError,
7374
} from './errors';
7475
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';
76+
import { WaitingPlugin } from './plugin';
7577

7678
type OnPluginAddedCallback = (plugin: Plugin) => void;
7779

@@ -120,6 +122,10 @@ export class SegmentClient {
120122
* Access or subscribe to client enabled
121123
*/
122124
readonly enabled: Watchable<boolean> & Settable<boolean>;
125+
/**
126+
* Access or subscribe to running state (controls event processing)
127+
*/
128+
readonly running: Watchable<boolean> & Settable<boolean>;
123129
/**
124130
* Access or subscribe to client context
125131
*/
@@ -258,6 +264,12 @@ export class SegmentClient {
258264
onChange: this.store.enabled.onChange,
259265
};
260266

267+
this.running = {
268+
get: this.store.running.get,
269+
set: this.store.running.set,
270+
onChange: this.store.running.onChange,
271+
};
272+
261273
// add segment destination plugin unless
262274
// asked not to via configuration.
263275
if (this.config.autoAddSegmentDestination === true) {
@@ -295,7 +307,6 @@ export class SegmentClient {
295307
if ((await this.store.isReady.get(true)) === false) {
296308
await this.storageReady();
297309
}
298-
299310
// Get new settings from segment
300311
// It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins
301312
// which make use of the settings object to initialize
@@ -309,7 +320,8 @@ export class SegmentClient {
309320
]);
310321
await this.onReady();
311322
this.isReady.value = true;
312-
323+
// Set running to true to start event processing
324+
await this.store.running.set(true);
313325
// Process all pending events
314326
await this.processPendingEvents();
315327
// Trigger manual flush
@@ -465,7 +477,6 @@ export class SegmentClient {
465477
settings
466478
);
467479
}
468-
469480
if (!this.isReady.value) {
470481
this.pluginsToAdd.push(plugin);
471482
} else {
@@ -476,6 +487,11 @@ export class SegmentClient {
476487
private addPlugin(plugin: Plugin) {
477488
plugin.configure(this);
478489
this.timeline.add(plugin);
490+
//check for waiting plugin here
491+
if (plugin instanceof WaitingPlugin) {
492+
this.pauseEventProcessingForPlugin(plugin);
493+
}
494+
479495
this.triggerOnPluginLoaded(plugin);
480496
}
481497

@@ -494,10 +510,15 @@ export class SegmentClient {
494510
if (this.enabled.get() === false) {
495511
return;
496512
}
497-
if (this.isReady.value) {
513+
if (this.running.get() && this.isReady.value) {
498514
return this.startTimelineProcessing(event);
499515
} else {
500-
this.store.pendingEvents.add(event);
516+
this.store.pendingEvents.set((events) => {
517+
if (events.length >= maxPendingEvents) {
518+
return [...events.slice(1), event];
519+
}
520+
return [...events, event];
521+
});
501522
return event;
502523
}
503524
}
@@ -1027,4 +1048,82 @@ export class SegmentClient {
10271048

10281049
return totalEventsCount;
10291050
}
1051+
private resumeTimeoutId?: ReturnType<typeof setTimeout>;
1052+
private waitingPlugins = new Set<WaitingPlugin>();
1053+
1054+
/**
1055+
* Pause event processing for a specific WaitingPlugin.
1056+
* Events will be buffered until all waiting plugins resume.
1057+
*
1058+
* @param plugin - The WaitingPlugin requesting the pause
1059+
* @internal This is called automatically when a WaitingPlugin is added
1060+
*/
1061+
pauseEventProcessingForPlugin(plugin?: WaitingPlugin) {
1062+
if (plugin) {
1063+
this.waitingPlugins.add(plugin);
1064+
}
1065+
this.pauseEventProcessing();
1066+
}
1067+
1068+
/**
1069+
* Resume event processing for a specific WaitingPlugin.
1070+
* If all waiting plugins have resumed, buffered events will be processed.
1071+
*
1072+
* @param plugin - The WaitingPlugin that has completed its async work
1073+
* @internal This is called automatically when a WaitingPlugin calls resume()
1074+
*/
1075+
async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) {
1076+
if (plugin) {
1077+
this.waitingPlugins.delete(plugin);
1078+
}
1079+
if (this.waitingPlugins.size > 0) {
1080+
return; // still blocked by other waiting plugins
1081+
}
1082+
1083+
await this.resumeEventProcessing();
1084+
}
1085+
1086+
/**
1087+
* Pause event processing globally.
1088+
* New events will be buffered in memory until resumeEventProcessing() is called.
1089+
* Automatically resumes after the specified timeout to prevent permanent blocking.
1090+
*
1091+
* @param timeout - Milliseconds to wait before auto-resuming (default: 30000)
1092+
*/
1093+
pauseEventProcessing(timeout = 30000) {
1094+
// IMPORTANT: ignore repeated pauses
1095+
const running = this.store.running.get();
1096+
if (!running) {
1097+
return;
1098+
}
1099+
1100+
// Fire-and-forget: state is updated synchronously in-memory, persistence happens async
1101+
void this.store.running.set(false);
1102+
1103+
// Only set timeout if not already set (prevents multiple waiting plugins from overwriting)
1104+
if (!this.resumeTimeoutId) {
1105+
this.resumeTimeoutId = setTimeout(async () => {
1106+
await this.resumeEventProcessing();
1107+
}, timeout);
1108+
}
1109+
}
1110+
1111+
/**
1112+
* Resume event processing and process all buffered events.
1113+
* This is called automatically by WaitingPlugins when they complete,
1114+
* or after the timeout expires.
1115+
*/
1116+
async resumeEventProcessing() {
1117+
const running = this.store.running.get();
1118+
if (running) {
1119+
return;
1120+
}
1121+
1122+
if (this.resumeTimeoutId) {
1123+
clearTimeout(this.resumeTimeoutId);
1124+
this.resumeTimeoutId = undefined;
1125+
}
1126+
await this.store.running.set(true);
1127+
await this.processPendingEvents();
1128+
}
10301129
}

packages/core/src/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ export const workspaceDestinationFilterKey = '';
1616

1717
export const defaultFlushAt = 20;
1818
export const defaultFlushInterval = 30;
19+
export const maxPendingEvents = 1000;

packages/core/src/plugin.ts

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin {
115115
key = '';
116116

117117
timeline = new Timeline();
118+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
119+
store: any;
118120

119121
private hasSettings() {
120122
return this.analytics?.settings.get()?.[this.key] !== undefined;
121123
}
122124

123-
private isEnabled(event: SegmentEvent): boolean {
125+
protected isEnabled(event: SegmentEvent): boolean {
124126
let customerDisabled = false;
125127
if (event.integrations?.[this.key] === false) {
126128
customerDisabled = true;
@@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin {
140142
if (analytics) {
141143
plugin.configure(analytics);
142144
}
145+
146+
if (analytics && plugin instanceof WaitingPlugin) {
147+
analytics.pauseEventProcessingForPlugin(plugin);
148+
}
143149
this.timeline.add(plugin);
144150
return plugin;
145151
}
@@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin {
179185
type: PluginType.before,
180186
event,
181187
});
182-
183188
if (beforeResult === undefined) {
184189
return;
185190
}
@@ -210,3 +215,84 @@ export class UtilityPlugin extends EventPlugin {}
210215

211216
// For internal platform-specific bits
212217
export class PlatformPlugin extends Plugin {}
218+
219+
export { PluginType };
220+
221+
/**
222+
* WaitingPlugin - A base class for plugins that need to pause event processing
223+
* until an asynchronous operation completes.
224+
*
225+
* When a WaitingPlugin is added to the Analytics client, it automatically pauses
226+
* event processing. Events are buffered in memory until the plugin calls resume().
227+
* If resume() is not called within 30 seconds, event processing automatically resumes.
228+
*
229+
* @example
230+
* ```typescript
231+
* class IDFAPlugin extends WaitingPlugin {
232+
* type = PluginType.enrichment;
233+
*
234+
* configure(analytics: SegmentClient) {
235+
* super.configure(analytics);
236+
* // Request IDFA permission
237+
* requestTrackingPermission().then((status) => {
238+
* if (status === 'authorized') {
239+
* // Add IDFA to context
240+
* }
241+
* this.resume(); // Resume event processing
242+
* });
243+
* }
244+
*
245+
* track(event: SegmentEvent) {
246+
* // Enrich event with IDFA if available
247+
* return event;
248+
* }
249+
* }
250+
* ```
251+
*
252+
* Common use cases:
253+
* - Waiting for user permissions (IDFA, location, notifications)
254+
* - Initializing native SDKs that provide enrichment data
255+
* - Loading remote configuration required for event processing
256+
* - Waiting for authentication state before sending events
257+
*
258+
* @remarks
259+
* Multiple WaitingPlugins can be active simultaneously. Event processing
260+
* only resumes when ALL waiting plugins have called resume() or timed out.
261+
*
262+
* WaitingPlugins can be added at any plugin type (before, enrichment, destination).
263+
* They can also be added to DestinationPlugins to pause only that destination's
264+
* event processing.
265+
*/
266+
export class WaitingPlugin extends Plugin {
267+
constructor() {
268+
super();
269+
}
270+
271+
/**
272+
* Configure the plugin with the Analytics client.
273+
* Automatically pauses event processing when called.
274+
* Override this method to perform async initialization, then call resume().
275+
*
276+
* @param analytics - The Analytics client instance
277+
*/
278+
configure(analytics: SegmentClient) {
279+
super.configure(analytics);
280+
}
281+
282+
/**
283+
* Manually pause event processing.
284+
* Generally not needed as adding a WaitingPlugin automatically pauses processing.
285+
*/
286+
pause() {
287+
this.analytics?.pauseEventProcessingForPlugin(this);
288+
}
289+
290+
/**
291+
* Resume event processing for this plugin.
292+
* Call this method when your async operation completes.
293+
* If all WaitingPlugins have resumed, buffered events will be processed.
294+
*/
295+
async resume() {
296+
await this.analytics?.resumeEventProcessingForPlugin(this);
297+
}
298+
}

0 commit comments

Comments
 (0)