feat: implement filter in cdc and incremental#756
Conversation
…eat/filter-in-cdc&incremental
…eat/filter-in-cdc&incremental
…eat/filter-in-cdc&incremental
b75a508 to
ab0848f
Compare
…eat/filter-in-cdc&incremental
…eat/filter-in-cdc&incremental
pkg/jdbc/jdbc.go
Outdated
| // ---------- NULL handling ---------- | ||
| if cond.Value == nil { |
There was a problem hiding this comment.
earlier this check was used to filter out null values like field != null , will it work the same way now as well . is it backward compatible.
There was a problem hiding this comment.
i have tested with putting null value in filter as well, worked fine
| if driverType == constants.Oracle && strings.Contains(v, "T") && (strings.Contains(v, "Z") || strings.Contains(v, "+") || (strings.Contains(v, "-") && len(v) > 19)) { | ||
| if t, err := time.Parse(time.RFC3339, v); err == nil { | ||
| timestampStr := t.Format("2006-01-02 15:04:05.000") | ||
| valueSQL = fmt.Sprintf( | ||
| "TO_TIMESTAMP('%s', 'YYYY-MM-DD HH24:MI:SS.FF')", | ||
| timestampStr, | ||
| ) | ||
| } |
There was a problem hiding this comment.
earlier the user had to enter the timestamp format in oracle specific timestamp format for the filter to work , what will this work if user enters the timestamp in that format ?
There was a problem hiding this comment.
have tested with this format as well 01-JUL-22 03.30.00.000000 PM, will work in Full load but not in cdc as ReformatValue function only support (YYYY-MM-DD) for parsing timestamp from string
There was a problem hiding this comment.
so wont the timestamp filter work in cdc ?
There was a problem hiding this comment.
it will work only if provided in this format (YYYY-MM-DD)
drivers/mysql/internal/mysql_test.go
Outdated
| FilterInput: `{ | ||
| "logical_operator": "And", | ||
| "conditions": [ | ||
| { | ||
| "column": "price_double", | ||
| "operator": "<", | ||
| "value": 239834.89 | ||
| }, | ||
| { | ||
| "column": "created_timestamp", | ||
| "operator": ">=", | ||
| "value": "2022-07-01T15:30:00.000+00:00" | ||
| } | ||
| ] |
There was a problem hiding this comment.
can we add other datatypes filter as well ?
There was a problem hiding this comment.
it would make integration test run longer, all datatypes are already covered in filter_test.go
There was a problem hiding this comment.
we can discuss this
…eat/filter-in-cdc&incremental
types/catalog.go
Outdated
| //legacy filter input | ||
| Filter string `json:"filter,omitempty"` | ||
| //new filter input | ||
| FilterInput *FilterInput `json:"filter_input,omitempty"` |
There was a problem hiding this comment.
Should we add this change also to clear-destination check ? To this function GetStreamsDelta()
There was a problem hiding this comment.
do u mean if user changes the filter?
There was a problem hiding this comment.
Yes, filter is there, but filter_config is not there
drivers/mongodb/internal/backfill.go
Outdated
| } | ||
|
|
||
| func buildMongoCondition(cond types.Condition) bson.D { | ||
| func buildMongoCondition(cond types.FilterCondition) bson.D { |
There was a problem hiding this comment.
The SQLFilter now supports multiple filter conditions, but here in Mongodb we are restricting to 2.
Shouldn't it be consistent ?
| // FilterRecords applies filtering ONLY for new filters. | ||
| // For legacy filters, records are returned unchanged. |
Description
This PR adds filtering capability for CDC and incremental sync modes, allowing users to selectively sync records based on configurable filter conditions.
Fixes # (issue)
Type of change
=,!=,<,>,<=,>=How Has This Been Tested?
Comprehensive test coverage in
destination/filter_test.goincluding:Legacy filter support
Multiple operators and data types
AND/OR logic
Null value handling
Type coercion
Edge cases
Scenario A
Scenario B
Screenshots or Recordings
Documentation
Related PR's (If Any):
New Filter Format
Filters can be specified in the stream configuration using the new
FilterInputstructure:{ "selected_streams": { "namespace": [ { "stream_name": "table_1", "filter_input": { "logical_operator": "AND", "conditions": [ { "column": "id", "operator": ">", "value": 100 }, { "column": "status", "operator": "=", "value": "active" } ] } } ] } }Legacy Filter Format (Still Supported)
The legacy string-based filter format continues to work:
{ "filter": "id > 100 AND status = \"active\"" }