[OPIK-4891] [BE] Add data retention policy enforcement#5647
[OPIK-4891] [BE] Add data retention policy enforcement#5647
Conversation
Backend Tests - Integration Group 15 15 files + 4 15 suites +4 2m 2s ⏱️ +50s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 76 and adds 125 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 9 28 files + 2 28 suites +2 8m 9s ⏱️ + 3m 25s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 21 and adds 36 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 12214 tests +15 212 ✅ +13 2m 47s ⏱️ -46s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 141 and adds 156 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 16 29 files - 10 29 suites - 10 3m 50s ⏱️ -37s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 78 and adds 45 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 5124 tests +17 124 ✅ +17 3m 2s ⏱️ -29s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 10 and adds 27 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 61 130 tests +3 1 129 ✅ +2 5m 37s ⏱️ -3s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 8 and adds 11 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 13440 tests +19 438 ✅ +17 4m 21s ⏱️ -2s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 67 and adds 86 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 10206 tests ±0 206 ✅ +2 8m 47s ⏱️ -43s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 63 and adds 63 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 14222 tests - 14 222 ✅ - 14 12m 5s ⏱️ + 2m 2s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 77 and adds 63 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 8268 tests +5 268 ✅ +6 4m 31s ⏱️ +22s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 13 and adds 18 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 7257 tests +19 257 ✅ +19 2m 21s ⏱️ +7s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 12 and adds 31 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.13)244 tests ±0 242 ✅ ±0 8m 35s ⏱️ -4s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.11)244 tests ±0 242 ✅ ±0 8m 33s ⏱️ ±0s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.14)244 tests ±0 242 ✅ ±0 8m 3s ⏱️ - 1m 28s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.10)244 tests ±0 242 ✅ ±0 12m 57s ⏱️ + 3m 54s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 11143 tests - 31 140 ✅ - 32 3m 11s ⏱️ +13s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 162 and adds 131 tests. Note that renamed tests count towards both.This pull request removes 2 skipped tests and adds 3 skipped tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.12)244 tests ±0 242 ✅ ±0 7m 55s ⏱️ - 5m 23s Results for commit 27a1623. ± Comparison against base commit c8c6dfc. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionPolicyJob.java
Outdated
Show resolved
Hide resolved
.../opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/RetentionRulesResource.java
Show resolved
Hide resolved
.../opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/RetentionRulesResource.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/RetentionPolicyService.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionPolicyService.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/RetentionPolicyService.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionRuleDAO.java
Outdated
Show resolved
Hide resolved
Add deleteForRetention methods to CommentDAO, FeedbackScoreDAO, SpanDAO, and TraceDAO with proper log_comment SETTINGS for ClickHouse audit trail. Register RetentionConfig in OpikConfiguration.
- Fix computeCurrentFraction to use tick index instead of minuteOfDay (avoids skipped/duplicate fractions when 1440 % executionsPerDay != 0) - Fix Location header: remove leading slash that replaced the full path - Remove duplicate doOnError logging (job layer is single source of truth) - Quote interpolated values in structured log messages - Filter out rules with applyToPast=false in groupByRetention
Tests were failing because groupByRetention now filters out rules where applyToPast != true, but test builders didn't set the flag.
Use unique workspace IDs per test method instead of static constants to prevent data accumulation when surefire retries tests (rerunFailingTestsCount=3). Each retry was inserting into the same workspace, causing count assertions to fail.
…y for ClickHouse visibility Feedback scores with an authenticated user go to authored_feedback_scores, not feedback_scores. Updated test assertions to query the correct table. Added Awaitility-based awaitData helper for ClickHouse async insert visibility and unique workspace IDs per test for surefire retry isolation.
…blocks, null-safe equality - Chain unlock into reactive pipeline instead of fire-and-forget subscribe() - Make DELETE idempotent: return 204 for non-existent/already-deactivated rules - Replace COALESCE(project_id, '') with null-safe <=> operator - Convert SQL string concatenation to text blocks in RetentionRuleDAO
…visibility - Move RetentionPolicyService, RetentionRuleService, RetentionRuleDAO to com.comet.opik.domain - Move RetentionUtils to com.comet.opik.utils - Revert SpanDAO from public back to package-private (no longer needed since service is in same package)
- Restore TraceDAO interface to package-private (was accidentally made public) - Restore deleted uuid_from_time/uuid_to_time query blocks in both DAOs - Re-apply only our retention additions on top of clean main state
…o post-rule data When applyToPast is false, the retention job only deletes data created after the rule was created. This uses a minimum UUID v7 (IdGenerator.generateMinId) as a lower bound (minId) so pre-existing data is preserved. - Add IdGenerator.generateMinId() for deterministic minimum UUID v7 at a timestamp - Refactor RetentionPolicyService to resolve per-rule cutoffId and optional minId - Update all 4 DAOs (Trace, Span, FeedbackScore, Comment) with conditional minId clause - Add test contrasting applyToPast=true vs false side-by-side Co-Authored-By: Claude Opus 4.6 <[email protected]>
…cate generateMinId
…), server-only ID generation
…conditions Instead of one query per applyToPast=false workspace, pack them into a single statement with per-workspace (workspace_id, min_id) OR clauses. applyToPast=true workspaces still use the simple IN (:workspace_ids) pattern. Normalize cutoff to start-of-day UTC for deterministic batching across ticks.
426103f to
d7147ce
Compare
| log.info("Retention policy job started: interval={}, executionsPerDay={}, fractions={}", | ||
| interval, config.getExecutionsPerDay(), config.getTotalFractions()); | ||
| } |
There was a problem hiding this comment.
Startup log emits unquoted placeholders: log.info("Retention policy job started: interval={}, executionsPerDay={}, fractions={}", ...), but backend logging guidelines (apps/opik-backend/AGENTS.md) require interpolated values to be quoted. This makes the startup entry non-compliant with our structured logging; can we quote the placeholders (e.g., interval='{}') to match the existing tick '{}' pattern?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionPolicyJob.java
around lines 68 to 70, the log message in the start() method uses unquoted placeholders.
Change the message to quote each interpolated value (for example, use "interval='{}',
executionsPerDay='{}', fractions='{}'"), keeping the same argument order (interval,
config.getExecutionsPerDay(), config.getTotalFractions()). This will make the startup
log conform to the backend structured-logging guideline requiring quoted values.
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionPolicyJob.java
Show resolved
Hide resolved
| return lockService.lockUsingToken(RUN_LOCK, Duration.ofSeconds(config.getLockTimeoutSeconds())) | ||
| .flatMap(acquired -> { | ||
| if (!acquired) { | ||
| log.info("Retention policy: could not acquire lock, another instance is running"); |
| int computeCurrentFraction(long tick) { | ||
| return (int) (tick % config.getTotalFractions()); | ||
| } |
There was a problem hiding this comment.
The tick counter from Flux.interval resets to 0 on every app restart, which means that if the service restarts mid-day, early fractions are reprocessed while later fractions are skipped until the next full cycle completes. This is harmless since the DELETEs are idempotent, but consider persisting the last processed fraction (e.g., in Redis) to resume where the previous run left off and ensure more even coverage across restarts.
| private final @NonNull FeedbackScoreDAO feedbackScoreDAO; | ||
| private final @NonNull CommentDAO commentDAO; | ||
| private final @NonNull InstantToUUIDMapper uuidMapper; | ||
| private final @NonNull @Config("retention") RetentionConfig config; |
There was a problem hiding this comment.
@Config doesn't work well with lombok, for injection, either inject OpikConfiguration or you will need a manual constructor
There was a problem hiding this comment.
Commit 27a1623 addressed this comment by replacing the Lombok @requiredargsconstructor with an explicit @Inject constructor that takes @config("retention") RetentionConfig, ensuring configuration injection works correctly.
| .concatMap(batch -> Flux.concat( | ||
| feedbackScoreDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("feedback_scores", batch.size(), e)), | ||
| commentDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("comments", batch.size(), e)), | ||
| spanDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("spans", batch.size(), e)), | ||
| traceDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("traces", batch.size(), e)))); |
There was a problem hiding this comment.
What happens if we delete all traces of an experiment?
apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java
Outdated
Show resolved
Hide resolved
| private static final String DELETE_FOR_RETENTION = """ | ||
| DELETE FROM <table_name> | ||
| WHERE workspace_id IN :workspace_ids | ||
| AND entity_id \\< :cutoff_id |
There was a problem hiding this comment.
[Correctness — Medium] entity_id < :cutoff_id is correct for trace-type entities (entity_id = trace UUID). However, for span-type feedback scores (entity_type = 'span'), entity_id is the span UUID. If the span's id is newer than the cutoff (which can happen for late-arriving spans), those scores will survive even though the parent trace is gone.
This is a cascading effect from the span orphan issue — once spans are deleted by trace_id instead of id, this becomes less of a concern. But it's worth noting that this query doesn't filter by entity_type, so it catches both.
Also, entity_id is the 4th component of the ORDER BY key (workspace_id, project_id, entity_type, entity_id, name) — ClickHouse can't use the sort key efficiently since project_id and entity_type are skipped.
There was a problem hiding this comment.
This is not a big problem.
First we actually want the deletions to go through all entity types and all projects, so not a problem, CH can navigate through selected workspaces, then check all branches in the next two levels, and finally narrow in the cutoff_id.
Second for the span type, we might leave a couple feedbacks to be deleted on the next day, so it's fine. They will be orphaned for a day.
A lot of the effort we have to do it here is a best effort on how to delete the user content without triggering secondary queries to check extra stuff.
apps/opik-backend/src/main/java/com/comet/opik/domain/RetentionPolicyService.java
Show resolved
Hide resolved
| .concatMap(batch -> Flux.concat( | ||
| feedbackScoreDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("feedback_scores", batch.size(), e)), | ||
| commentDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("comments", batch.size(), e)), | ||
| spanDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("spans", batch.size(), e)), | ||
| traceDAO.deleteForRetention(batch, cutoffId) | ||
| .onErrorResume(e -> logAndSkip("traces", batch.size(), e)))); |
There was a problem hiding this comment.
[Correctness — Medium] onErrorResume per table means a transient ClickHouse error on e.g. feedback_scores is swallowed — we log and move on to comments, spans, traces. The failed table won't be retried until the same fraction is processed again, which happens once per day (since each fraction runs only once in a 24h cycle).
This means orphaned feedback scores could exist for up to 24 hours. Consider either:
- Keeping a "failed table + workspace" set in Redis and retrying on the next tick (any fraction), or
- At minimum, emitting a metric/alert when a table delete fails so ops can monitor it
There was a problem hiding this comment.
I believe the best effort execution without retries is actually better performance wise. If something happens with a deletion today, tomorrow's execution for the same range will tackle them (and as it's navigating the sortkeys anyway, it doesnt go one extra time for them), so worst case scenario the oldest data in the workspace will have some incosistency for a day.
|
|
||
| return Mono.fromCallable(() -> template.inTransaction(READ_ONLY, handle -> { | ||
| var dao = handle.attach(RetentionRuleDAO.class); | ||
| return dao.findActiveWorkspaceRulesInRange(range[0], range[1]); |
There was a problem hiding this comment.
[Performance — Low] findActiveWorkspaceRulesInRange loads all matching rules into memory without pagination. If thousands of workspaces have active retention rules, this could be a large result set. Not critical for initial rollout, but worth keeping in mind as adoption grows.
There was a problem hiding this comment.
I'm not particularly worried about this tbh. For 1M workspaces, with the standard 48 ranges, that's only 20k strings. We can either keep 20k small strings in memory or we can convert this into going to Clickhouse 20 times (assuming 1k batches). I choose the first.
apps/opik-backend/src/main/java/com/comet/opik/infrastructure/RetentionConfig.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/infrastructure/RetentionConfig.java
Show resolved
Hide resolved
| public static String[] computeWorkspaceRange(int fraction, int totalFractions) { | ||
| long maxVal = 1L << 32; | ||
| long rangeSize = maxVal / totalFractions; | ||
|
|
||
| long start = fraction * rangeSize; | ||
| long end = (fraction == totalFractions - 1) ? maxVal : (fraction + 1) * rangeSize; | ||
|
|
||
| String rangeStart = String.format("%08x", start); | ||
| String rangeEnd = (end >= maxVal) | ||
| ? "~" // ASCII 126, sorts after all alphanumeric chars (some workspace_ids are not hex UUIDs) | ||
| : String.format("%08x", end); | ||
|
|
||
| return new String[]{rangeStart, rangeEnd}; |
There was a problem hiding this comment.
[Medium] Workspace IDs starting with non-hex characters (g–z) will always sort above ffffffff and land in the last fraction only (via the ~ sentinel). If non-UUID workspace IDs exist in production, this concentrates load unevenly — the last fraction does all the work for those workspaces while other fractions skip them.
If this is a real scenario, consider hash-based partitioning (e.g. MD5(workspace_id).substring(0,8)) for more even distribution.
There was a problem hiding this comment.
The non-UUID workspaces do exist, but they are ~0.01% of the total workspaces, so I considered the hash solution would be overengineering and making the fetch query slower.
There was a problem hiding this comment.
[Performance — Low] findActiveWorkspaceRulesInRange filters on enabled = true AND retention != 'unlimited' AND project_id IS NULL AND workspace_id >= ? AND workspace_id < ?. The existing index idx_active_workspace (enabled, workspace_id) only partially covers this.
However, note that retention != 'unlimited' is a not-equal condition — MySQL cannot efficiently use a B-tree index for != and will stop traversing the index after that column. So retention should be excluded from the index.
A better covering index would be:
INDEX idx_retention_job (enabled, project_id, workspace_id)This gives:
enabled = true— equality ✓project_id IS NULL— equality (IS NULL works with B-tree) ✓workspace_id >= ... AND < ...— range (must be last) ✓
The retention != 'unlimited' filter is applied as a post-filter after the index narrows the rows.
Not critical at low scale, but helpful as the rules table grows.
…eight_deletes_sync=0, config docs - log.error → log.warn for tick failures (recoverable, retried next interval) - Manual constructor for RetentionPolicyService (@config doesn't work with Lombok) - Remove @Valid from primitive fields in RetentionConfig - Add lightweight_deletes_sync=0 to all retention DELETE queries (async mutation) - Document recommended executionsPerDay divisor values in config.yml
Details
Adds the data retention policy enforcement system for ClickHouse data (traces, spans, feedback scores, comments).
How it works
Rules are stored in MySQL (
retention_rulestable). Each rule targets a scope: workspace-level or org-level. Rules define a pre-defined retention period (short_14d,base_60d,extended_400d, orunlimited) after which data is eligible for deletion. Only one active rule per scope is allowed — creating a new rule auto-deactivates the previous one (soft delete for audit trail).The job (
RetentionPolicyJob) runs on aFlux.interval()schedule, controlled byexecutionsPerDay(default 48 = every 30 min). Each tick processes a different 1/N fraction of the workspace ID hex-space, so all workspaces are covered exactly once per day. A distributed Redis lock prevents concurrent execution across instances.The service (
RetentionPolicyService) resolves the most specific active rule per workspace (workspace rule > org rule), computes a cutoff UUID fromstart_of_today - retention_period, and deletes in referential order: feedback_scores (+ authored_feedback_scores) → comments → spans → traces. Each DAO'sdeleteForRetentionissues aDELETE FROM <table> WHERE workspace_id IN (...) AND <id_column> < cutoff_id. WhenapplyToPast=false, an additional lower bound (id >= min_idderived from the rule'screatedAt) ensures only data created after the rule was established gets deleted.Workspace partitioning (
RetentionUtils) splits the UUID hex-space into N equal fractions using the first hex digit of workspace_id. This avoids scanning all workspaces on every tick and distributes load evenly.Batching by cutoff — since retention periods are pre-defined enum values (not arbitrary durations), workspaces sharing the same cutoff are grouped to minimize queries. The cutoff is normalized to start-of-day (UTC) so all ticks within the same day produce identical cutoffs for the same period. Two query patterns are used:
applyToPast=true: simpleWHERE workspace_id IN (...) AND id < :cutoff— all workspaces packed into one IN clauseapplyToPast=false: per-workspace OR conditionsWHERE id < :cutoff AND ((workspace_id = :w1 AND id >= :min1) OR (workspace_id = :w2 AND id >= :min2) OR ...)— different minIds packed into a single statementSequential execution — all deletion batches and table-level deletes run sequentially (
concatMap/Flux.concat), not in parallel. Retention deletes can be very large, and parallel mutations would risk saturating ClickHouse connections and causing excessive merge pressure.Key design decisions
RETENTION_ENABLED=falsein config.yml. No Helm changes needed; toggle via env var at deployment time.enabled=false), preserving audit trail.applyToPastdefaults totrue— unless explicitly set tofalse, retention rules apply to all existing data. This keeps most workspaces in the efficient batch path.DELETEmarks rows as deleted (so they're no longer accessible in queries), and the actual mutation is applied asynchronously by ClickHouse's merge process.deleteForRetention/deleteForRetentionWithBoundsmethods — no changes to existing methods.Not in scope for this version
project_idcolumn exists in the rules table and uniqueness is enforced, but filtering in the deletion path is not yet implemented. Will be addressed in an upcoming PR.All will be part of the codebase before prod activation.
File guide
RetentionPolicyJobRetentionPolicyServiceRetentionRuleServiceRetentionRuleDAOretention_rulestableRetentionUtilsRetentionConfigRetentionRule/RetentionLevel/RetentionPeriodRetentionRulesResource000056_create_retention_rules_table.sqlCommentDAO/FeedbackScoreDAO/SpanDAO/TraceDAOdeleteForRetention+deleteForRetentionWithBoundsmethodsconfig.ymlChange checklist
Issues
Testing
RetentionUtilsTest— workspace range partitioning, sentinel coverage for non-hex workspace IDsRetentionPolicyServiceTest— actual ClickHouse deletion verification, rule priority resolution (workspace > org), multi-workspace isolation, disabled/unlimited rule handling, children-before-parents ordering,applyToPast=falsepreserves pre-existing data across all 4 entity types (same retention period, both patterns exercised in one cycle)RetentionRulesResourceTest— CRUD operations, validation, workspace scoping, idempotent deactivation, auto-deactivation of previous ruleDocumentation