RLI support for Flink streaming #17452
Replies: 8 comments 26 replies
-
|
cc @vinothchandar for the review. |
Beta Was this translation helpful? Give feedback.
-
|
@HuangZhenQiu as well. along with @prashantwason @suryaprasanna |
Beta Was this translation helpful? Give feedback.
-
1. Writing structure and missing details.
2. Precise answers to 'What exactly happens during a Flink checkpoint? When do we exactly commit the write ?'
I don't fully follow this. I am specifically interested in this. The key problem to solve here IMO is : when a Flink checkpoint happens it can give us a list of data, mt files written. But some data files returned, its corresponding write to RLI may not have propagated. IIUC, in the proposed approach, you are just writing MT ahead of writing the data files (just like the Flink state backend approach rn), to keep this synchronized? 3. Caching parts of RLI / partitioned RLIWe are not addressing how we scale the cache. And what is expected to be hot. IMO we need partitioned RLI and the cache should be such that the "hot" partition paths's RLI entries are what are cached.. Missing these details.
I agree that we need some caching. but we need to flesh this out more. We can come back to the caching after we align on the original approach. 4. Consider Secondary Index also in scopeThe current design is not very extensible to secondary index updates. I'd like for that to be included in scope. In case of SI, the main challenge is that the index entries cannot be produced until we actually perform the writing of the data files. MT is updated in a downstream operator from the current operator that writes data files.. |
Beta Was this translation helpful? Give feedback.
-
|
My preferred high-level approach here is to decouple the Flink checkpoint from the Hudi commit boundary. i.e every checkpoint we track the files written under |
Beta Was this translation helpful? Give feedback.
-
|
Would you please describe how the existing record index will be loaded into Flink state when an upsert Flink job initially starts and writes to a dataset with MOR? |
Beta Was this translation helpful? Give feedback.
-
|
I am still only focussing on the RLI pieces and the write (not caching or compaction or SI yet)
Are you basically saying instead of
So - this happens in the same operator that writes the data files?
Your diagram says "shuffled by record key" which is different. Can you clarify - is it I see a basic conflict here.
For anything, we propose around RLI writes, I want to understand how we will write 1 log file per each RLI filegroup (shard) for each commit .. ( we cannot have a lot of small files in RLI) I thought you will do something like (still does not work for SI)
When the thing then checkpoints, you know what RLI files were written and what data files were written. You commit both respectively into MT, MT files and DT . Note that the above does not work with positional updates/deletes, since we don't know the position ahead of time. I want to first understand your proposal. I am not very sure, if this is the direction we go. |
Beta Was this translation helpful? Give feedback.
-
No. This design has to cover both RLI and SI. in some form. else, I am happy to take a swing at the design. We should not push forward with a RLI-only approach, that will render all the SI capability unusable for e.g when reading from Trino or Spark. |
Beta Was this translation helpful? Give feedback.
-
|
@danny0405 @geserdugarov @HuangZhenQiu I am sketching an approach here, to seed further discussion. Please take this forward. Assumption:
DAG: The main difference is instead of special casing the RLI write, we do after
In terms of some comments on performance or slowest operator etc, its understandable that if there are a lot of SIs to be updated, it will proportionally longer. But this design still will be similar in perf if only RLI is enabled. Once, we align on this - lets update the top level discussion description. We can move to discussing caching design for BucketAssignor Op. |
Beta Was this translation helpful? Give feedback.

Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Background
Flink does not support RLI while spark does, this caused inconsistency between engines, for tables migrated from Spark to flink streaming, the index type needs to be switched to either bucket or flink_state , this caused a overhead for users in production.
Another reason is for multiple partition upserts, currently the only choice is flink_state index, but the flink_state actually costs a lot of memory and can not be shared between different workloads.
Goals
Non Goals
[VC: what are these?]
The Design
The high-level ideas:
FILESpartition is ready)The Impl
The Write
The RLI Access
In
BucketAssigneroperator, the RLI index metadata would be utilitized as the index backend, theBucketAssigneroperator will probe the RLI with the incoming record keys to figure out whether msg is update or insert or delete. In other words, the RLI index metadata will served as the same role of theflink_stateindex.The Cache of RLI Access
We need fast access in streaming to have high throughput(ideally per record access should be < 10ms), thus a general hotspot cache is needed. We will build a in-memory LRU cache by the active upsert records keys, the cache items will be force evictted by a configured memory threshold.
We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible.
The query will access the in-memory cache first then the MDT RLI index:

The Shuffle of RLI Payloads
In
StreamWriteoperator, the the index items are inferred and sent toIndexWriteoperator in streaming style, the index reocrds are shuffled byhash(record_key) % num_rli_shards(the same hashing algorithm of the MDTRLI indexpartitioner), this is critical to avoid N*M files to write to MDT partition(N is the RLI partition buckets number, M is the data table buckets involved in the current write).How do we ensure the data record and index record always belong to one commit/checkpoint: the barrier is flowing together with the records in Flink, see how-does-state-snapshotting-work, when the
StreamWriteoperator received a record, it emits its corresponding index record in one#processElementcall, so we can always keep the bindings of these two, in other words, no barrier would be amidst of the two.The RLI Write
In
IndexWriteoperator, the index items are buffered first and write to the MDT (triggered by Flink checkpoint), the write status metadata will be sent to thecoordinator. The metadata sent to the coordinator includes two parts:The Commit of MDT(including RLI)
And when commit to the data table, the MDT is committed firstly with the partial RLI/SI write metadata list(the MDT RLI/SI partition file handles info), the
RLIandSIpartition file handles info would be commited altogether with theFILESpartition.On Flink checkpoint : each index/data writing task flushes all its records to RLI and data files respectively. So the RLI and data files are always consistent. We commit both as we do now, from Coordinator into a single hudi commit.
In order to keep exactly once semantics for job recovery, the write status metadata will also needs to be stored both in the
StreamWriteoperator,IndexWriteoperator and thecoordinator, pretty much the same behaviors of the current maintainance of the data table metadata.The Compaction
In order not take up too much task slot, we will reuse the current compaction sub-pipeline for scalable execution of the MDT compaction, it is auto applied when RLI is enabled.

Open Questions
needs to benchmark the read perf of index items in BucketAssign op, to see if we need to introudce layered cache strategies similiar with RocksDB ;
Appendix
SI support
Because SI needs to be figured out on the fly after data files are created, we generally needs another SI write op to handle the write of SI, the op parallelism is same with the SI partition buckets number, the shffle strategy should be in line with the MDT SI partitioner.
The partial commit metadata of SI is sent to the coordinator for MDT commit.
This will increase the checkpoint time a lot and has risk of checkpoint timeout and backpressure for hight volumn workloads.
One solution is to build the SI async before queries but that is another story and not in the scope of this design.Beta Was this translation helpful? Give feedback.
All reactions