[feature](lineage) Support lineage SPI framework for data lineage collection#61004
[feature](lineage) Support lineage SPI framework for data lineage collection#61004seawinde wants to merge 5 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
7463610 to
36125d9
Compare
|
run buildall |
36125d9 to
b958837
Compare
|
run buildall |
b958837 to
954df99
Compare
|
run buildall |
TPC-H: Total hot run time: 28741 ms |
TPC-DS: Total hot run time: 184195 ms |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 28891 ms |
TPC-DS: Total hot run time: 183870 ms |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
Code Review Summary: feature Support lineage SPI framework for data lineage collection
This PR introduces a data lineage collection framework with SPI plugin architecture, lineage extraction from Nereids plans, and async event processing. The overall design is sound and aligns well with the existing fe-extension-spi/fe-extension-loader architecture. The test coverage for the extractor is thorough (25+ test cases).
Below are the issues found, organized by the critical checkpoints from our review guidelines.
Critical Checkpoint Conclusions
1. Goal and Correctness: The PR accomplishes its stated goal of providing a pluggable lineage SPI framework. The lineage extraction logic correctly handles JOINs, CTEs, UNIONs, window functions, and conditional expressions. However, there is a design gap where Config.activate_lineage_plugin is not enforced at the framework level (see Issue #1).
2. Modification Scope: The PR is focused and well-scoped. Changes to existing files are minimal and non-intrusive (PlannerHook signature change is safe since no existing implementations override the analyze hooks).
3. Concurrency: The Worker thread has a subtle race where it reads the plugin list BEFORE polling the queue, meaning events can be dispatched to a stale plugin list (see Issue #2). The worker thread also has no graceful shutdown mechanism.
4. Lifecycle Management: No shutdown/close mechanism for the worker thread or loaded plugins. The LineagePlugin extends Plugin which extends Closeable, but close() is never called on plugins.
5. Configuration Items: lineage_event_queue_size is marked mutable=true but the queue capacity is baked in at construction time, making runtime changes ineffective (see Issue #3).
6. Incompatible Changes: The PlannerHook interface changed beforeAnalyze(CascadesContext) to beforeAnalyze(NereidsPlanner). Verified safe: no existing implementations override the analyze hooks.
7. Parallel Code Paths: N/A for this feature.
8. Test Coverage: Unit tests are comprehensive for extraction and processor lifecycle. However, there are no regression tests under regression-test/ (see Issue #5). The LineageUtilsSkipTest is a good addition.
9. Observability: Debug logging is adequate. No metrics are added for the lineage path, which is acceptable for an initial implementation.
10. Transaction/Persistence: N/A - no EditLog or transaction changes.
11. Performance: isLineagePluginConfigured() is called on every DML execution path and iterates all plugins calling eventFilter(). This should use a cached flag rather than iterating plugins on every query (see Issue #4).
| List<LineagePlugin> plugins = new ArrayList<>(); | ||
| for (Map.Entry<String, LineagePluginFactory> entry : factories.entrySet()) { | ||
| String pluginName = entry.getKey(); | ||
| try { |
There was a problem hiding this comment.
Issue #1 (Medium): Config.activate_lineage_plugin is not enforced at framework level
All discovered plugin factories are instantiated unconditionally here. The activate_lineage_plugin config is documented as the mechanism to select which plugins are active, but it is never checked by the framework - filtering is entirely delegated to each plugin's eventFilter() implementation.
This means:
- Plugins are loaded and initialized even when not listed in
activate_lineage_plugin - A buggy plugin that returns
truefromeventFilter()regardless of config will receive events even when not configured - There is no built-in way to disable a misbehaving plugin without removing its JAR
Suggestion: Either filter factories by activate_lineage_plugin before instantiation, or provide a default eventFilter() implementation in the framework that checks the config:
// In discoverPlugins(), before creating plugin:
Set<String> activeNames = new HashSet<>(Arrays.asList(Config.activate_lineage_plugin));
if (!activeNames.isEmpty() && !activeNames.contains(pluginName)) {
LOG.info("Skip lineage plugin not in activate_lineage_plugin: {}", pluginName);
continue;
}There was a problem hiding this comment.
fixed add Config.activate_lineage_plugin filter
| while (true) { | ||
| List<LineagePlugin> currentPlugins = lineagePlugins.get(); | ||
|
|
||
| try { |
There was a problem hiding this comment.
Issue #2 (Medium): Worker reads stale plugin list and has no graceful shutdown
Two issues here:
- Stale plugin list:
currentPluginsis read at line 298 BEFOREeventQueue.poll()at line 301, which blocks for up to 5 seconds. IfrefreshPlugins()is called during that wait, the event will be dispatched to the old plugin list. The fix is to readlineagePlugins.get()AFTER successfully polling an event:
while (true) {
try {
lineageInfo = eventQueue.poll(EVENT_POLL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (lineageInfo == null) {
continue;
}
} catch (InterruptedException e) {
LOG.warn("...", e);
continue;
}
List<LineagePlugin> currentPlugins = lineagePlugins.get(); // Move here
for (LineagePlugin lineagePlugin : currentPlugins) {
...
}
}- No graceful shutdown: The
while(true)loop has no exit condition. When FE shuts down, the daemon thread is killed abruptly. Consider adding avolatile boolean runningflag and astop()method that sets it to false and interrupts the worker thread, so plugins can be closed cleanly.
There was a problem hiding this comment.
1 Agreed, will fix. Moving lineagePlugins.get() after the successful poll() is a zero-cost improvement for correctness.
There was a problem hiding this comment.
2 The setDaemon(true) is intentional — it follows the same pattern as AuditEventProcessor in Doris FE, which also uses daemon + while(true) without an explicit stop(). Since there's no unified graceful shutdown framework in FE today, adding one here alone would be inconsistent. Happy to revisit if FE introduces a global shutdown lifecycle later.
|
|
||
| @ConfField(mutable = true, description = {"血缘事件队列最大长度,超过长度事件会被舍弃", | ||
| "Max size of lineage event queue, events will be discarded when exceeded"}) | ||
| public static int lineage_event_queue_size = 50000; |
There was a problem hiding this comment.
Issue #3 (Low): lineage_event_queue_size marked mutable=true but is effectively immutable
The LinkedBlockingDeque in LineageEventProcessor is constructed with this config value at field initialization time (line 70-71 of LineageEventProcessor.java). LinkedBlockingDeque has no setCapacity() method, so changing this config at runtime via SET GLOBAL has no effect on the actual queue capacity.
To make it truly mutable, follow the AuditEventProcessor pattern: use an unbounded LinkedBlockingDeque and check Config.lineage_event_queue_size at enqueue time in submitLineageEvent():
if (eventQueue.size() >= Config.lineage_event_queue_size) {
LOG.warn("...");
return false;
}
eventQueue.add(lineageInfo);Alternatively, remove mutable = true if runtime changes are not intended.
There was a problem hiding this comment.
Good catch. Removed mutable = true from lineage_event_queue_size. The LinkedBlockingDeque capacity is fixed at construction time with no setCapacity() API, so runtime changes via SET GLOBAL would silently have no effect. Queue size is a low-frequency config — restarting FE to apply changes is acceptable.
| private static boolean isLineagePluginConfigured() { | ||
| LineageEventProcessor processor = Env.getCurrentEnv().getLineageEventProcessor(); | ||
| return processor != null && processor.hasActivePlugins(); | ||
| } |
There was a problem hiding this comment.
Issue #4 (Medium): isLineagePluginConfigured() iterates all plugins on every DML execution
hasActivePlugins() is called on every INSERT INTO SELECT, INSERT OVERWRITE, and CTAS execution. It iterates through all loaded plugins and calls eventFilter() on each one. For a system processing thousands of queries per second, this adds unnecessary overhead to the hot path.
Consider caching the result with a short TTL (e.g., 5-10 seconds) or using a simple volatile boolean flag that is updated when plugins are refreshed or config changes:
private static boolean isLineagePluginConfigured() {
LineageEventProcessor processor = Env.getCurrentEnv().getLineageEventProcessor();
return processor != null && processor.hasActivePlugins(); // This iterates all plugins
}Alternatively, since activate_lineage_plugin defaults to empty, a quick check of Config.activate_lineage_plugin.length > 0 before calling hasActivePlugins() would be a cheap fast-path exit for the common case of no lineage plugins.
There was a problem hiding this comment.
The overhead is negligible in practice hasActivePlugins()
is a volatile read + iteration over typically 0-2 plugins.
| boolean eventFilter(); | ||
|
|
||
| /** | ||
| * Processes a lineage event. |
There was a problem hiding this comment.
Issue #5 (Low): SPI contract should document thread-safety requirement
eventFilter() and exec() are called from the single Worker thread, but eventFilter() is also called from arbitrary query threads (via hasActivePlugins() in isLineagePluginConfigured()). The SPI contract should document that:
eventFilter()must be thread-safe (called from multiple threads concurrently)exec()is called from a single worker thread (but concurrent witheventFilter()calls)
This helps plugin implementors avoid subtle concurrency bugs.
FE Regression Coverage ReportIncrement line coverage |
1 similar comment
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
1 similar comment
|
run buildall |
TPC-H: Total hot run time: 27508 ms |
TPC-DS: Total hot run time: 153126 ms |
FE UT Coverage ReportIncrement line coverage |
…nd ensure producer-internal alias resolution.
034013e to
426b8b3
Compare
|
run buildall |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
TPC-H: Total hot run time: 27663 ms |
TPC-DS: Total hot run time: 152588 ms |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
What problem does this PR solve?
This PR introduces a data lineage collection framework for Apache Doris, enabling automatic extraction and reporting of table-level and column-level lineage from DML operations (
INSERT INTO SELECT,INSERT OVERWRITE,CREATE TABLE AS SELECT).The framework is designed as an SPI (Service Provider Interface) to allow pluggable lineage consumers, aligned with the existing
fe-authenticationSPI architecture usingfe-extension-spiandfe-extension-loader.Architecture Design
Key Components
1. Lineage Extraction (
LineageInfoExtractor)2. SPI Interfaces (aligned with
fe-authentication)LineagePluginextends Plugin(fe-extension-spi)eventFilter()+exec(LineageInfo)LineagePluginFactoryextends PluginFactory(fe-extension-spi)name()+create()3. Event Processing (
LineageEventProcessor)AuthenticationPluginManager):ServiceLoader.load()for built-in classpath pluginsDirectoryPluginRuntimeManager.loadAll()for external directory pluginsorg.apache.doris.nereids.lineage.*to ensure SPI interface identityBlockingQueue(configurable vialineage_event_queue_size) + daemon worker thread4. Integration Points
NereidsPlanner→PlannerHookcallback after plan optimizationInsertIntoTableCommand/InsertOverwriteTableCommand/CreateTableCommand→ trigger lineage extraction on successful executionConnectContext→ carries lineage metadata (query ID, user, database, timing)Env.java→lineageEventProcessor.start()at FE startupConfiguration
activate_lineage_plugin{}(empty)dataworkslineage_event_queue_size50000plugin_dir$DORIS_HOME/pluginsHow to Use
LineagePlugin+LineagePluginFactoryin an external moduleMETA-INF/services/org.apache.doris.nereids.lineage.LineagePluginFactoryplugin.confto$plugin_dir/lineage/{plugin_name}/activate_lineage_plugin = {plugin_name}infe.confTesting
LineageEventProcessorTest: 11 tests covering plugin lifecycle, event dispatch, error handling, SPI factory contractLineageInfoExtractorTest: 25+ tests covering INSERT INTO, CTAS, INSERT OVERWRITE, JOINs, subqueries, expressions, multi-catalogLineageUtilsSkipTest: Tests for lineage skip conditionsIssue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)