Skip to content

Commit b6e9d8e

Browse files
Add data-exchange interface and async source support (#1335)
* Add data-exchange interface and async source support Introduce KNetConnectDataExchange for CLR/JVM object exchange and implement it across connectors, tasks, transformations and predicates. Enhance KNetSourceTask with async record push support (PushRecordAsync, CreateAndPushRecordAsync), UseOnlyAsync flag and poll logic to consume an internal queue. Refactor .NET bridge: add IKNetCommon, make remote ExecuteOnRemote/DataToExchange use a unified reflectedConnectorOrTask, expose internal Start/TaskConfigs/Stop entrypoints, add DeallocateTask, and introduce several IKNet* interfaces and API visibility changes to support the new async/out-of-sync behavior and cleaner interop. * Update documentation * Fix #1321 updating missing changes in #1325
1 parent 4321139 commit b6e9d8e

18 files changed

Lines changed: 1043 additions & 378 deletions

src/documentation/articles/connectSDK.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ This is only a quick start guide for KNet Connect SDK, other information related
1414

1515
The code for a connector based on KNet Connect SDK follows the same rules for both .NET and the JVM hosted runtimes.
1616

17+
> [!TIP]
18+
> Except for some specific method signature or additional properties, the rules used in a JVM developed connectors/transformer/predicate applies to KNet Connect SDK connectors/transformer/predicate.
19+
1720
### Source connector
1821

1922
A source connector shall be defined extending the following class:
@@ -35,13 +38,18 @@ The mandatory method to be implemented is:
3538
```c#
3639
public abstract System.Collections.Generic.IList<SourceRecord> Poll();
3740
```
38-
which returns a set of `SourceRecord`, each `SourceRecord` can be created directly or by using the `CreateRecord` helper methods available.
41+
which returns a set of `SourceRecord`, each `SourceRecord` can be created directly or by using the available `CreateRecord` helper methods.
3942

4043
> [!TIP]
4144
> Starting from KNet 3.0.3, beside the standard invocation where the `SourceRecord`s will be returned once at the end of the `Poll` invocation and then pushed to the JVM,
4245
by using the `CreateAndPushRecord` helper methods available each `SourceRecord` is created and directly pushed to the JVM:
4346
this new way can be used to take advantage of the idle time if the `KNetSourceTask<TTask>` is waiting to receive data, e.g. socket, disk access, etc.
4447

48+
> [!TIP]
49+
> KNet 3.1.2 introduces, beside the `CreateAndPushRecord` helper methods, some new `CreateAndPushRecordAsync` helper methods which can push records in the JVM outside the `Poll` invocation:
50+
these new helpers decouple `Poll` invocation from `SourceRecord` generation and can be useful if the .NET code is taking adavantage of async/await pattern or similar; if `UseOnlyAsync` is set to true
51+
the `Poll` method is never invoked and everything happens in async from the .NET point-of-view (JVM side continues to invoke the `poll` method, but the returned list was filled in the meantime).
52+
4553
### Sink connector
4654

4755
A sink connector shall be defined extending the following class:
@@ -146,7 +154,7 @@ knet.dotnet.classname=MyConnectorNamespace.MySourceConnector, MySourceConnectorA
146154
```
147155

148156
From KNet 3.1.2, the JVM side exposes two new methods `getAssemblyLocation` and `getClassName`.
149-
Overriding them in a child class of
157+
Overriding the methods in a child class of
150158
- `org.mases.knet.developed.connect.sink.KNetSinkConnector`
151159
- `org.mases.knet.developed.connect.source.KNetSourceConnector`
152160
- `org.mases.knet.developed.connect.transforms.KNetTransformation`
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2021-2025 MASES s.r.l.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Refer to LICENSE for more information.
17+
*/
18+
19+
package org.mases.knet.developed.connect;
20+
21+
/*
22+
* All KNet Connect SDK object implements this interface for data exchange between JVM and CLR
23+
*/
24+
public interface KNetConnectDataExchange {
25+
/* Invoked from CLR to retrieve the object stored from CLR before a method invocation
26+
* @return the object stored from JVM can be used from CLR
27+
*/
28+
Object getDataToExchange();
29+
30+
/* Set the object from CLR to be used from the JVM
31+
* @param dte the object set from CLR
32+
*/
33+
void setDataToExchange(Object dte);
34+
}

src/jvm/knet/src/main/java/org/mases/knet/developed/connect/KNetConnectInitializer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818

1919
package org.mases.knet.developed.connect;
2020

21+
/*
22+
* All KNet Connect SDK main object implements this interface to define mandatory information used from infrastructure
23+
*/
2124
public interface KNetConnectInitializer {
2225
/* Overrides in a derived class to define the Assembly location
2326
*
2427
* @returns the location where the assembly is available, return null to use the configuration property
2528
*/
2629
String getAssemblyLocation();
30+
2731
/* Overrides in a derived class to define the .NET class to be loaded
2832
*
2933
* @returns the location where the assembly is available, return null to use the configuration property

src/jvm/knet/src/main/java/org/mases/knet/developed/connect/sink/KNetSinkConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.connect.sink.SinkConnector;
2626
import org.apache.kafka.connect.sink.SinkConnectorContext;
2727
import org.mases.jcobridge.*;
28+
import org.mases.knet.developed.connect.KNetConnectDataExchange;
2829
import org.mases.knet.developed.connect.KNetConnectInitializer;
2930
import org.mases.knet.developed.connect.KNetConnectLogging;
3031
import org.mases.knet.developed.connect.KNetConnectProxy;
@@ -37,7 +38,7 @@
3738
import java.util.List;
3839
import java.util.Map;
3940

40-
public class KNetSinkConnector extends SinkConnector implements KNetConnectLogging, KNetConnectInitializer {
41+
public class KNetSinkConnector extends SinkConnector implements KNetConnectLogging, KNetConnectInitializer, KNetConnectDataExchange {
4142
private static final Logger log = LoggerFactory.getLogger(KNetSinkConnector.class);
4243

4344
private static final String registrationName = "KNetSinkConnector";

src/jvm/knet/src/main/java/org/mases/knet/developed/connect/sink/KNetSinkTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.connect.sink.SinkTask;
2424
import org.apache.kafka.connect.sink.SinkTaskContext;
2525
import org.mases.jcobridge.*;
26+
import org.mases.knet.developed.connect.KNetConnectDataExchange;
2627
import org.mases.knet.developed.connect.KNetConnectLogging;
2728
import org.mases.knet.developed.connect.KNetConnectProxy;
2829
import org.slf4j.Logger;
@@ -32,7 +33,7 @@
3233
import java.util.Collection;
3334
import java.util.Map;
3435

35-
public class KNetSinkTask extends SinkTask implements KNetConnectLogging {
36+
public class KNetSinkTask extends SinkTask implements KNetConnectLogging, KNetConnectDataExchange {
3637
private static final Logger log = LoggerFactory.getLogger(KNetSinkTask.class);
3738

3839
private static final String registrationName = "KNetSinkTask";

src/jvm/knet/src/main/java/org/mases/knet/developed/connect/source/KNetSourceConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.connect.errors.ConnectException;
2525
import org.apache.kafka.connect.source.*;
2626
import org.mases.jcobridge.*;
27+
import org.mases.knet.developed.connect.KNetConnectDataExchange;
2728
import org.mases.knet.developed.connect.KNetConnectInitializer;
2829
import org.mases.knet.developed.connect.KNetConnectLogging;
2930
import org.mases.knet.developed.connect.KNetConnectProxy;
@@ -36,7 +37,7 @@
3637
import java.util.List;
3738
import java.util.Map;
3839

39-
public class KNetSourceConnector extends SourceConnector implements KNetConnectLogging, KNetConnectInitializer {
40+
public class KNetSourceConnector extends SourceConnector implements KNetConnectLogging, KNetConnectInitializer, KNetConnectDataExchange {
4041
private static final Logger log = LoggerFactory.getLogger(KNetSourceConnector.class);
4142

4243
private static final String registrationName = "KNetSourceConnector";

src/jvm/knet/src/main/java/org/mases/knet/developed/connect/source/KNetSourceTask.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.connect.source.SourceTask;
2424
import org.apache.kafka.connect.source.SourceTaskContext;
2525
import org.mases.jcobridge.*;
26+
import org.mases.knet.developed.connect.KNetConnectDataExchange;
2627
import org.mases.knet.developed.connect.KNetConnectLogging;
2728
import org.mases.knet.developed.connect.KNetConnectProxy;
2829
import org.slf4j.Logger;
@@ -31,7 +32,7 @@
3132
import java.io.IOException;
3233
import java.util.*;
3334

34-
public class KNetSourceTask extends SourceTask implements KNetConnectLogging {
35+
public class KNetSourceTask extends SourceTask implements KNetConnectLogging, KNetConnectDataExchange {
3536
private static final Logger log = LoggerFactory.getLogger(KNetSourceTask.class);
3637

3738
private static final String registrationName = "KNetSourceTask";
@@ -44,6 +45,9 @@ public class KNetSourceTask extends SourceTask implements KNetConnectLogging {
4445

4546
Object dataToExchange = null;
4647

48+
boolean _useOnlyAsyncEnable = false;
49+
List<SourceRecord> _sourceList;
50+
4751
public Object getDataToExchange() {
4852
return dataToExchange;
4953
}
@@ -57,6 +61,7 @@ public KNetSourceTask() throws ConnectException, JCException, IOException {
5761
log.debug("Invoking ctor of KNetSourceTask");
5862
taskId = KNetConnectProxy.getNewTaskId();
5963
indexedRegistrationName = String.format("%s_%d", registrationName, taskId);
64+
_sourceList = new ArrayList<>();
6065
if (JCOBridge.isCLRHostingProcess()) {
6166
JCOBridge.RegisterJVMGlobal(indexedRegistrationName, this);
6267
JCObject source = KNetConnectProxy.getSourceConnector();
@@ -102,27 +107,59 @@ public void start(Map<String, String> map) {
102107
} finally {
103108
dataToExchange = null;
104109
}
110+
requestUseOnlyAsync();
105111
} catch (JCException | IOException jcne) {
106112
log.error("Failed Invoke of \"start\"", jcne);
107113
throw new ConnectException("Failed Invoke of \"start\"", jcne);
108114
}
109115
}
110116

117+
public void requestUseOnlyAsync() {
118+
try {
119+
try {
120+
dataToExchange = _useOnlyAsyncEnable;
121+
sourceTask.Invoke("UseOnlyAsyncInternal");
122+
_useOnlyAsyncEnable = (boolean) dataToExchange;
123+
} finally {
124+
dataToExchange = null;
125+
}
126+
} catch (JCNativeException jcne) {
127+
log.error("Failed Invoke of \"UseOnlyAsyncInternal\"", jcne);
128+
}
129+
}
130+
131+
public void pushRecordAsync(SourceRecord record) {
132+
synchronized (this) {
133+
_sourceList.add(record);
134+
}
135+
}
136+
111137
@Override
112138
public List<SourceRecord> poll() throws InterruptedException {
113139
log.debug("Invoking poll");
114-
try {
140+
synchronized (this) {
141+
int size = 0;
115142
try {
116-
dataToExchange = new ArrayList<SourceRecord>();
117-
sourceTask.Invoke("PollInternal");
118-
return (List<SourceRecord>) dataToExchange;
143+
if (_useOnlyAsyncEnable) {
144+
if (_sourceList.isEmpty()) return null;
145+
} else {
146+
try {
147+
dataToExchange = _sourceList;
148+
sourceTask.Invoke("PollInternal");
149+
} catch (JCNativeException jcne) {
150+
log.error("Failed Invoke of \"PollInternal\"", jcne);
151+
}
152+
}
153+
size = _sourceList.size();
154+
return (size != 0) ? _sourceList : null;
119155
} finally {
120156
dataToExchange = null;
157+
if (size != 0) {
158+
log.debug("Current size: {} - next list capacity will use this value.", size);
159+
_sourceList = new ArrayList<>(size);
160+
}
121161
}
122-
} catch (JCNativeException jcne) {
123-
log.error("Failed Invoke of \"poll\"", jcne);
124162
}
125-
return null;
126163
}
127164

128165
public <V> Map<String, Object> offsetAt(String key, V value) {

src/jvm/knet/src/main/java/org/mases/knet/developed/connect/transforms/KNetTransformation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.connect.components.Versioned;
2424
import org.apache.kafka.connect.connector.ConnectRecord;
2525
import org.apache.kafka.connect.transforms.Transformation;
26+
import org.mases.knet.developed.connect.KNetConnectDataExchange;
2627
import org.mases.knet.developed.connect.KNetConnectInitializer;
2728
import org.mases.knet.developed.connect.KNetConnectLogging;
2829
import org.mases.knet.developed.connect.KNetConnectProxy;
@@ -33,7 +34,7 @@
3334
import java.io.IOException;
3435
import java.util.Map;
3536

36-
public class KNetTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned, KNetConnectLogging, KNetConnectInitializer {
37+
public class KNetTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned, KNetConnectLogging, KNetConnectInitializer, KNetConnectDataExchange {
3738
private static final Logger log = LoggerFactory.getLogger(KNetTransformation.class);
3839

3940
private static final String registrationName = "KNetTransformation";

src/jvm/knet/src/main/java/org/mases/knet/developed/connect/transforms/predicates/KNetPredicate.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import org.apache.kafka.connect.components.Versioned;
2424
import org.apache.kafka.connect.connector.ConnectRecord;
2525
import org.apache.kafka.connect.transforms.predicates.Predicate;
26-
import org.mases.knet.developed.connect.KNetConnectInitializer;
27-
import org.mases.knet.developed.connect.KNetConnectLogging;
28-
import org.mases.knet.developed.connect.KNetConnectProxy;
26+
import org.mases.knet.developed.connect.*;
2927
import org.mases.jcobridge.*;
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
@@ -35,7 +33,7 @@
3533

3634
import org.mases.knet.developed.connect.KNetConnectInitializer;
3735

38-
public class KNetPredicate<R extends ConnectRecord<R>> implements Predicate<R>, Versioned, KNetConnectLogging, KNetConnectInitializer {
36+
public class KNetPredicate<R extends ConnectRecord<R>> implements Predicate<R>, Versioned, KNetConnectLogging, KNetConnectInitializer, KNetConnectDataExchange {
3937
private static final Logger log = LoggerFactory.getLogger(KNetPredicate.class);
4038

4139
private static final String registrationName = "KNetPredicate";
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.mases.knet.developed.connect.transforms.KNetTransformation

0 commit comments

Comments
 (0)