Skip to content

feat: add sink.parallelism support for OceanBase JDBC connector#138

Open
yuanoOo wants to merge 5 commits intooceanbase:mainfrom
yuanoOo:feature/sink-parallelism
Open

feat: add sink.parallelism support for OceanBase JDBC connector#138
yuanoOo wants to merge 5 commits intooceanbase:mainfrom
yuanoOo:feature/sink-parallelism

Conversation

@yuanoOo
Copy link
Copy Markdown
Collaborator

@yuanoOo yuanoOo commented Apr 16, 2026

Summary

  • Add support for configuring sink parallelism via sink.parallelism option in SQL DDL or Table Hints
  • Users can now set a custom parallelism for the sink operator

Changes

  • Add SINK_PARALLELISM config option in ConnectorOptions
  • Modify SinkProvider to support parallelism via getParallelism() method
  • Register the option in OceanBaseDynamicTableSinkFactory
  • Pass parallelism parameter in OceanBaseDynamicTableSink
  • Add unit test for SinkProvider parallelism
  • Add integration test for sink.parallelism configuration

Usage

DDL Configuration

CREATE TABLE ob_sink (
    id INT,
    name STRING
) WITH (
    'connector' = 'oceanbase',
    'url' = 'jdbc:mysql://localhost:2881/test',
    'username' = 'root',
    'password' = 'password',
    'schema-name' = 'test',
    'table-name' = 'sink_table',
    'sink.parallelism' = '4'
);

Table Hints

INSERT INTO ob_sink /*+ OPTIONS('sink.parallelism' = '8') */
SELECT * FROM source_table;

Test Plan

  • Unit test: AbstractDynamicTableSinkTest verifies getParallelism() returns correct value
  • Integration test: testSinkParallelism in OceanBaseMySQLConnectorITCase
  • GitHub CI passes

yuanoOo added 5 commits April 16, 2026 14:30
Add support for configuring sink parallelism via 'sink.parallelism' option.
This allows users to set a custom parallelism for the sink operator in SQL DDL
or Table Hints.

Changes:
- Add SINK_PARALLELISM config option in ConnectorOptions
- Modify SinkProvider to support parallelism via getParallelism() method
- Register the option in OceanBaseDynamicTableSinkFactory
- Pass parallelism parameter in OceanBaseDynamicTableSink
- Add unit test for SinkProvider parallelism
- Add integration test for sink.parallelism configuration
- Fix compilation error: pass parallelism to SinkProvider in OBKVHBaseDynamicTableSink
- Register SINK_PARALLELISM in OBKVHBaseDynamicTableSinkFactory
- Add validation that sink.parallelism must be positive
- Add unit test for invalid parallelism values
- Clean up integration test comments
Add verification of sink parallelism in integration test by checking
the JSON execution plan contains the expected parallelism value.
Print execution plan content to help diagnose parallelism verification
failure. Also add more flexible assertion patterns.
The parallelism was correctly parsed but not applied to the DataStreamSink.
Added setParallelism() call in consumeDataStream() to properly set the
sink parallelism.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant