Skip to content

feat: implement filter in cdc and incremental#756

Open
vikaxsh wants to merge 62 commits intostagingfrom
feat/filter-in-cdc&incremental
Open

feat: implement filter in cdc and incremental#756
vikaxsh wants to merge 62 commits intostagingfrom
feat/filter-in-cdc&incremental

Conversation

@vikaxsh
Copy link
Collaborator

@vikaxsh vikaxsh commented Jan 14, 2026

Description

This PR adds filtering capability for CDC and incremental sync modes, allowing users to selectively sync records based on configurable filter conditions.

Fixes # (issue)

Type of change

  • Multiple Conditions: Support for multiple filter conditions with AND/OR logical operators
  • Operators: Supports =, !=, <, >, <=, >=
  • Type-Aware Filtering: Automatic type conversion and validation based on schema
  • New feature (non-breaking change which adds functionality)

How Has This Been Tested?

Comprehensive test coverage in destination/filter_test.go including:

  • Legacy filter support

  • Multiple operators and data types

  • AND/OR logic

  • Null value handling

  • Type coercion

  • Edge cases

  • Scenario A

  • Scenario B

Screenshots or Recordings

Documentation

  • Documentation Link: [link to README, olake.io/docs, or olake-docs]
  • N/A (bug fix, refactor, or test changes only)

Related PR's (If Any):

New Filter Format

Filters can be specified in the stream configuration using the new FilterInput structure:

{
  "selected_streams": {
    "namespace": [
      {
        "stream_name": "table_1",
        "filter_input": {
          "logical_operator": "AND",
          "conditions": [
            {
              "column": "id",
              "operator": ">",
              "value": 100
            },
            {
              "column": "status",
              "operator": "=",
              "value": "active"
            }
          ]
        }
      }
    ]
  }
}

Legacy Filter Format (Still Supported)

The legacy string-based filter format continues to work:

{
  "filter": "id > 100 AND status = \"active\""
}

@vikaxsh vikaxsh force-pushed the feat/filter-in-cdc&incremental branch from b75a508 to ab0848f Compare January 20, 2026 10:26
@vikaxsh vikaxsh marked this pull request as ready for review January 21, 2026 08:21
pkg/jdbc/jdbc.go Outdated
Comment on lines 1116 to 1117
// ---------- NULL handling ----------
if cond.Value == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earlier this check was used to filter out null values like field != null , will it work the same way now as well . is it backward compatible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have tested with putting null value in filter as well, worked fine

Comment on lines +1135 to +1142
if driverType == constants.Oracle && strings.Contains(v, "T") && (strings.Contains(v, "Z") || strings.Contains(v, "+") || (strings.Contains(v, "-") && len(v) > 19)) {
if t, err := time.Parse(time.RFC3339, v); err == nil {
timestampStr := t.Format("2006-01-02 15:04:05.000")
valueSQL = fmt.Sprintf(
"TO_TIMESTAMP('%s', 'YYYY-MM-DD HH24:MI:SS.FF')",
timestampStr,
)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earlier the user had to enter the timestamp format in oracle specific timestamp format for the filter to work , what will this work if user enters the timestamp in that format ?

Copy link
Collaborator Author

@vikaxsh vikaxsh Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have tested with this format as well 01-JUL-22 03.30.00.000000 PM, will work in Full load but not in cdc as ReformatValue function only support (YYYY-MM-DD) for parsing timestamp from string

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so wont the timestamp filter work in cdc ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will work only if provided in this format (YYYY-MM-DD)

Comment on lines 23 to 36
FilterInput: `{
"logical_operator": "And",
"conditions": [
{
"column": "price_double",
"operator": "<",
"value": 239834.89
},
{
"column": "created_timestamp",
"operator": ">=",
"value": "2022-07-01T15:30:00.000+00:00"
}
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add other datatypes filter as well ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would make integration test run longer, all datatypes are already covered in filter_test.go

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can discuss this

types/catalog.go Outdated
//legacy filter input
Filter string `json:"filter,omitempty"`
//new filter input
FilterInput *FilterInput `json:"filter_input,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this change also to clear-destination check ? To this function GetStreamsDelta()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do u mean if user changes the filter?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, filter is there, but filter_config is not there

}

func buildMongoCondition(cond types.Condition) bson.D {
func buildMongoCondition(cond types.FilterCondition) bson.D {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQLFilter now supports multiple filter conditions, but here in Mongodb we are restricting to 2.

Shouldn't it be consistent ?

Comment on lines 25 to 26
// FilterRecords applies filtering ONLY for new filters.
// For legacy filters, records are returned unchanged.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant comment

@vikaxsh vikaxsh temporarily deployed to integration_tests February 26, 2026 15:12 — with GitHub Actions Inactive
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants