Skip to content

Commit 0380f9c

Browse files
committed
feat(grouper): dual-write to unified events/repetitions collections
- Add lib/db/unified for dual-write helpers (insertEventUnified, insertRepetitionUnified, ensureEventAndIncrementUnified) - Add UNIFIED_EVENTS_PROJECT_IDS env for project-specific dual-write (empty = disabled) - Grouper: writeEventToUnified for new events, writeRepetitionToUnified for repetitions - ensureEventAndIncrementUnified for existed events (insert if missing, then increment) - Add hawk_dual_write_failures_total metric - Update .env.sample with USE_UNIFIED_EVENTS_COLLECTIONS and UNIFIED_EVENTS_PROJECT_IDS Made-with: Cursor
1 parent ae01c55 commit 0380f9c

File tree

8 files changed

+262
-2
lines changed

8 files changed

+262
-2
lines changed

.env.sample

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ HAWK_CATCHER_TOKEN=
3636

3737
# Feature flags
3838

39+
## Dual-write to unified events/repetitions collections (events, repetitions with projectId)
40+
## USE_UNIFIED_EVENTS_COLLECTIONS=true enables the feature
41+
## UNIFIED_EVENTS_PROJECT_IDS - comma-separated project ObjectIds. If empty/not set, dual-write disabled
42+
USE_UNIFIED_EVENTS_COLLECTIONS=false
43+
# UNIFIED_EVENTS_PROJECT_IDS=69a85c0b28eb907f8013c130
44+
3945
## If true, Grouper worker will send messages about new events to Notifier worker
4046
IS_NOTIFIER_WORKER_ENABLED=false
4147

lib/db/unified/dualWrite.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* @file Dual-write helper for unified events/repetitions collections
3+
*
4+
* Writes to unified collections (events, repetitions) with projectId.
5+
* Used when USE_UNIFIED_EVENTS_COLLECTIONS=true.
6+
* Errors are logged, not thrown — dual-write must not break main flow.
7+
*
8+
* @see docs/mongodb-unified-collections/
9+
*/
10+
11+
import { ObjectId } from 'mongodb';
12+
import type { Db } from 'mongodb';
13+
import { incrementDualWriteFailure } from '../../metrics/dualWrite';
14+
15+
const EVENTS_COLLECTION = 'events';
16+
const REPETITIONS_COLLECTION = 'repetitions';
17+
18+
let _projectIdsCache: Set<string> | null = null;
19+
20+
function getUnifiedProjectIds(): Set<string> {
21+
if (_projectIdsCache !== null) return _projectIdsCache;
22+
const raw = process.env.UNIFIED_EVENTS_PROJECT_IDS ?? '';
23+
const ids = raw.split(',').map((id) => id.trim()).filter(Boolean);
24+
_projectIdsCache = new Set(ids);
25+
return _projectIdsCache;
26+
}
27+
28+
function isUnifiedEnabledForProject(projectId: string): boolean {
29+
if (process.env.USE_UNIFIED_EVENTS_COLLECTIONS !== 'true') return false;
30+
const ids = getUnifiedProjectIds();
31+
if (ids.size === 0) return false;
32+
return ids.has(projectId);
33+
}
34+
35+
function toObjectId(projectId: string): ObjectId {
36+
return new ObjectId(projectId);
37+
}
38+
39+
/**
40+
* Insert event into unified events collection with projectId
41+
*
42+
* @param db - MongoDB Db instance (hawk_events)
43+
* @param projectId - project ObjectId as string
44+
* @param doc - event document (without projectId)
45+
* @param insertedId - _id from original insert (to keep consistency)
46+
*/
47+
export async function insertEventUnified(
48+
db: Db,
49+
projectId: string,
50+
doc: Record<string, unknown>,
51+
insertedId: ObjectId
52+
): Promise<void> {
53+
if (!isUnifiedEnabledForProject(projectId)) return;
54+
55+
try {
56+
const unifiedDoc = {
57+
...doc,
58+
projectId: toObjectId(projectId),
59+
_id: insertedId,
60+
};
61+
await db.collection(EVENTS_COLLECTION).insertOne(unifiedDoc);
62+
} catch {
63+
incrementDualWriteFailure('events');
64+
}
65+
}
66+
67+
/**
68+
* Insert repetition into unified repetitions collection with projectId
69+
*
70+
* @param db - MongoDB Db instance (hawk_events)
71+
* @param projectId - project ObjectId as string
72+
* @param doc - repetition document (without projectId)
73+
* @param insertedId - _id from original insert (to keep consistency)
74+
*/
75+
export async function insertRepetitionUnified(
76+
db: Db,
77+
projectId: string,
78+
doc: Record<string, unknown>,
79+
insertedId: ObjectId
80+
): Promise<void> {
81+
if (!isUnifiedEnabledForProject(projectId)) return;
82+
83+
try {
84+
const unifiedDoc = {
85+
...doc,
86+
projectId: toObjectId(projectId),
87+
_id: insertedId,
88+
};
89+
await db.collection(REPETITIONS_COLLECTION).insertOne(unifiedDoc);
90+
} catch {
91+
incrementDualWriteFailure('repetitions');
92+
}
93+
}
94+
95+
/**
96+
* Ensure event exists in unified collection, then increment counter.
97+
* Used when processing repetitions for events created before dual-write was enabled.
98+
*/
99+
export async function ensureEventAndIncrementUnified(
100+
db: Db,
101+
projectId: string,
102+
groupHash: string,
103+
eventDoc: Record<string, unknown>,
104+
incrementAffected: boolean
105+
): Promise<void> {
106+
if (!isUnifiedEnabledForProject(projectId)) return;
107+
108+
const projectIdObj = toObjectId(projectId);
109+
const collection = db.collection(EVENTS_COLLECTION);
110+
111+
try {
112+
const existing = await collection.findOne({ projectId: projectIdObj, groupHash });
113+
if (!existing) {
114+
const unifiedDoc = { ...eventDoc, projectId: projectIdObj };
115+
await collection.insertOne(unifiedDoc);
116+
}
117+
118+
const updateQuery = incrementAffected
119+
? { $inc: { totalCount: 1, usersAffected: 1 } }
120+
: { $inc: { totalCount: 1 } };
121+
122+
await collection.updateOne(
123+
{ projectId: projectIdObj, groupHash },
124+
updateQuery
125+
);
126+
} catch {
127+
incrementDualWriteFailure('events');
128+
}
129+
}

lib/db/unified/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/**
2+
* @file Re-export dual-write helpers for unified events/repetitions collections
3+
*/
4+
5+
export {
6+
insertEventUnified,
7+
insertRepetitionUnified,
8+
ensureEventAndIncrementUnified,
9+
} from './dualWrite';

lib/metrics/dualWrite.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* @file Prometheus metrics for dual-write to unified collections
3+
*
4+
* @see docs/mongodb-unified-collections/
5+
*/
6+
7+
import promClient from 'prom-client';
8+
9+
export const dualWriteFailuresTotal = new promClient.Counter({
10+
name: 'hawk_dual_write_failures_total',
11+
help: 'Counter of dual-write failures to unified collections by type',
12+
labelNames: ['type'],
13+
});
14+
15+
/**
16+
* Increment dual-write failure counter
17+
* @param type - 'events' | 'repetitions'
18+
*/
19+
export function incrementDualWriteFailure(type: 'events' | 'repetitions'): void {
20+
dualWriteFailuresTotal.labels(type).inc();
21+
}

workers/env.d.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,16 @@ declare namespace NodeJS {
1919
* How long to store events (in days)
2020
*/
2121
MAX_DAYS_NUMBER: string;
22+
23+
/**
24+
* If true, dual-write to unified events/repetitions collections (events, repetitions with projectId)
25+
* @default 'false'
26+
*/
27+
USE_UNIFIED_EVENTS_COLLECTIONS?: string;
28+
29+
/**
30+
* Comma-separated project ObjectIds for dual-write. If empty/not set, dual-write disabled
31+
*/
32+
UNIFIED_EVENTS_PROJECT_IDS?: string;
2233
}
2334
}

workers/grouper/.env.sample

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
11
# Url for connecting to Redis
22
REDIS_URL=redis://redis:6379
3+
4+
# Dual-write to unified events/repetitions collections (events, repetitions with projectId)
5+
USE_UNIFIED_EVENTS_COLLECTIONS=false
6+
# Comma-separated project ObjectIds. If empty/not set, dual-write disabled
7+
# UNIFIED_EVENTS_PROJECT_IDS=69a85c0b28eb907f8013c130

workers/grouper/src/index.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ import type {
1818
import type { RepetitionDBScheme } from '../types/repetition';
1919
import { DatabaseReadWriteError, DiffCalculationError, ValidationError } from '../../../lib/workerErrors';
2020
import { decodeUnsafeFields, encodeUnsafeFields } from '../../../lib/utils/unsafeFields';
21+
import {
22+
writeEventToUnified,
23+
writeRepetitionToUnified,
24+
ensureEventAndIncrementUnified,
25+
} from './unified/dualWriteEvents';
2126
import { MS_IN_SEC } from '../../../lib/utils/consts';
2227
import TimeMs from '../../../lib/utils/time';
2328
import DataFilter from './data-filter';
@@ -200,14 +205,21 @@ export default class GrouperWorker extends Worker {
200205
/**
201206
* Insert new event
202207
*/
203-
await this.saveEvent(task.projectId, {
208+
const groupedEventData = {
204209
groupHash: uniqueEventHash,
205210
totalCount: 1,
206211
catcherType: task.catcherType,
207212
payload: task.payload,
208213
timestamp: task.timestamp,
209214
usersAffected: incrementAffectedUsers ? 1 : 0,
210-
} as GroupedEventDBScheme);
215+
} as GroupedEventDBScheme;
216+
const insertedId = await this.saveEvent(task.projectId, groupedEventData);
217+
await writeEventToUnified(
218+
this.eventsDb.getConnection(),
219+
task.projectId,
220+
groupedEventData as unknown as Record<string, unknown>,
221+
insertedId
222+
);
211223

212224
const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash);
213225

@@ -244,6 +256,13 @@ export default class GrouperWorker extends Worker {
244256
await this.incrementEventCounterAndAffectedUsers(task.projectId, {
245257
groupHash: uniqueEventHash,
246258
}, incrementAffectedUsers);
259+
await ensureEventAndIncrementUnified(
260+
this.eventsDb.getConnection(),
261+
task.projectId,
262+
uniqueEventHash,
263+
existedEvent as unknown as Record<string, unknown>,
264+
incrementAffectedUsers
265+
);
247266

248267
/**
249268
* Decode existed event to calculate diffs correctly
@@ -269,6 +288,12 @@ export default class GrouperWorker extends Worker {
269288
} as RepetitionDBScheme;
270289

271290
repetitionId = await this.saveRepetition(task.projectId, newRepetition);
291+
await writeRepetitionToUnified(
292+
this.eventsDb.getConnection(),
293+
task.projectId,
294+
newRepetition as unknown as Record<string, unknown>,
295+
repetitionId
296+
);
272297

273298
/**
274299
* Clear the large event payload references to allow garbage collection
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* @file Dual-write to unified events/repetitions collections (Grouper layer)
3+
*
4+
* Thin wrappers around lib/db/unified. Called after saveEvent, saveRepetition,
5+
* incrementEventCounterAndAffectedUsers when USE_UNIFIED_EVENTS_COLLECTIONS=true.
6+
* Errors are logged in lib layer, don't throw.
7+
*
8+
* @see docs/mongodb-unified-collections/
9+
*/
10+
11+
import type { Db, ObjectId } from 'mongodb';
12+
import {
13+
insertEventUnified,
14+
insertRepetitionUnified,
15+
ensureEventAndIncrementUnified as ensureEventAndIncrementUnifiedLib,
16+
} from '../../../../lib/db/unified';
17+
18+
/**
19+
* Write event to unified events collection (dual-write)
20+
*/
21+
export async function writeEventToUnified(
22+
db: Db,
23+
projectId: string,
24+
doc: Record<string, unknown>,
25+
insertedId: ObjectId
26+
): Promise<void> {
27+
await insertEventUnified(db, projectId, doc, insertedId);
28+
}
29+
30+
/**
31+
* Write repetition to unified repetitions collection (dual-write)
32+
*/
33+
export async function writeRepetitionToUnified(
34+
db: Db,
35+
projectId: string,
36+
doc: Record<string, unknown>,
37+
insertedId: ObjectId
38+
): Promise<void> {
39+
await insertRepetitionUnified(db, projectId, doc, insertedId);
40+
}
41+
42+
/**
43+
* Ensure event exists in unified collection (insert if missing), then increment counter.
44+
* Used when processing repetitions for events created before dual-write was enabled.
45+
*/
46+
export async function ensureEventAndIncrementUnified(
47+
db: Db,
48+
projectId: string,
49+
groupHash: string,
50+
eventDoc: Record<string, unknown>,
51+
incrementAffected: boolean
52+
): Promise<void> {
53+
await ensureEventAndIncrementUnifiedLib(db, projectId, groupHash, eventDoc, incrementAffected);
54+
}

0 commit comments

Comments
 (0)