Skip to content

Commit d7deb90

Browse files
authored
[#45] add KeyBased Batcher for function mesh (#122)
* add BatchBuilder * yaml update * add tests
1 parent 50b3aef commit d7deb90

File tree

10 files changed

+200
-396
lines changed

10 files changed

+200
-396
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
name: Precommit - Test Function Kind with Key Based Batcher
2+
on:
3+
pull_request:
4+
branches:
5+
- '*'
6+
jobs:
7+
lint-test:
8+
runs-on: ubuntu-latest
9+
steps:
10+
- name: clean disk
11+
run: |
12+
sudo swapoff -a
13+
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
14+
sudo apt clean
15+
docker rmi $(docker images -q) -f
16+
df -h
17+
18+
- name: Checkout
19+
uses: actions/checkout@v2
20+
with:
21+
fetch-depth: 0
22+
ref: ${{ github.event.pull_request.head.sha }}
23+
24+
- name: Deploy k8s cluster env
25+
run: |
26+
.ci/deploy_pulsar_cluster.sh
27+
28+
- name: Build runner images
29+
run: |
30+
PULSAR_IMAGE_TAG=2.7.0 PULSAR_IMAGE=apachepulsar/pulsar-all KIND_PUSH=true images/build.sh
31+
32+
- name: Install operator-sdk
33+
run: |
34+
RELEASE_VERSION=v1.2.0
35+
curl -LO https://github.com/operator-framework/operator-sdk/releases/download/${RELEASE_VERSION}/operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu
36+
chmod +x operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu && sudo mkdir -p /usr/local/bin/ && sudo cp operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu /usr/local/bin/operator-sdk && rm operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu
37+
38+
# - name: Add CRD, controller or webhooks
39+
# run: |
40+
# operator-sdk create api --group cloud --version v1alpha1 --kind Function --resource=true --controller=true
41+
# operator-sdk create webhook --group compute.functionmesh.io --version v1alpha1 --kind Function --defaulting --programmatic-validation
42+
43+
- name: Deploy function mesh server
44+
run: |
45+
make generate
46+
make install
47+
nohup make run &
48+
49+
- name: Test Function kind
50+
run: |
51+
kubectl apply -f config/samples/compute_v1alpha1_function_key_based_batcher.yaml
52+
kubectl get all
53+
54+
- name: Verfy Function Mesh
55+
run: |
56+
.ci/verify_function_mesh.sh java-function-batcher-sample

api/v1alpha1/common.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ type ProducerConfig struct {
134134
MaxPendingMessagesAcrossPartitions int32 `json:"maxPendingMessagesAcrossPartitions,omitempty"`
135135
UseThreadLocalProducers bool `json:"useThreadLocalProducers,omitempty"`
136136
CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty"`
137+
BatchBuilder string `json:"batchBuilder,omitempty"`
137138
}
138139

139140
type CryptoConfig struct {

config/crd/bases/compute.functionmesh.io_functionmeshes.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ spec:
164164
type: object
165165
producerConf:
166166
properties:
167+
batchBuilder:
168+
type: string
167169
cryptoConfig:
168170
properties:
169171
consumerCryptoFailureAction:
@@ -5865,6 +5867,8 @@ spec:
58655867
type: object
58665868
producerConf:
58675869
properties:
5870+
batchBuilder:
5871+
type: string
58685872
cryptoConfig:
58695873
properties:
58705874
consumerCryptoFailureAction:

config/crd/bases/compute.functionmesh.io_functions.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ spec:
164164
type: object
165165
producerConf:
166166
properties:
167+
batchBuilder:
168+
type: string
167169
cryptoConfig:
168170
properties:
169171
consumerCryptoFailureAction:

config/crd/bases/compute.functionmesh.io_sources.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ spec:
7676
type: object
7777
producerConf:
7878
properties:
79+
batchBuilder:
80+
type: string
7981
cryptoConfig:
8082
properties:
8183
consumerCryptoFailureAction:
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
apiVersion: compute.functionmesh.io/v1alpha1
2+
kind: Function
3+
metadata:
4+
name: java-function-batcher-sample
5+
namespace: default
6+
spec:
7+
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
8+
image: streamnative/pulsar-all:2.7.1
9+
sourceType: java.lang.String
10+
sinkType: java.lang.String
11+
forwardSourceMessageProperty: true
12+
MaxPendingAsyncRequests: 1000
13+
replicas: 1
14+
maxReplicas: 5
15+
logTopic: persistent://public/default/logging-function-logs
16+
input:
17+
topics:
18+
- persistent://public/default/java-function-input-topic
19+
output:
20+
topic: persistent://public/default/java-function-output-topic
21+
producerConf:
22+
batchBuilder: "KEY_BASED"
23+
resources:
24+
requests:
25+
cpu: "0.1"
26+
memory: 1G
27+
limits:
28+
cpu: "0.2"
29+
memory: 1.1G
30+
# each secret will be loaded ad an env variable from the `path` secret with the `key` in that secret in the name of `name`
31+
secretsMap:
32+
"name":
33+
path: "test-secret"
34+
key: "username"
35+
"pwd":
36+
path: "test-secret"
37+
key: "password"
38+
pulsar:
39+
pulsarConfig: "test-pulsar"
40+
#authConfig: "test-auth"
41+
volumeMounts:
42+
- mountPath: /cache
43+
name: cache-volume
44+
pod:
45+
labels:
46+
"locaction": "mtv"
47+
annotations:
48+
"managed-function": "true"
49+
volumes:
50+
- name: cache-volume
51+
emptyDir: {}
52+
imagePullSecrets:
53+
- name: regcred
54+
initContainers:
55+
- name: init-function
56+
image: busybox:1.28
57+
command: ['sh', '-c', 'echo The app is running! && sleep 30']
58+
sidecars:
59+
- name: sidecar-function
60+
image: busybox:1.28
61+
command: ['sh', '-c', 'echo The app is running! && sleep 30000']
62+
java:
63+
jar: /pulsar/examples/api-examples.jar
64+
jarLocation: ""
65+
# use package name:
66+
# jarLocation: function://public/default/nul-test-java-function@v1
67+
# to be delete & use admission hook
68+
clusterName: test-pulsar
69+
autoAck: true
70+
---
71+
apiVersion: v1
72+
kind: ConfigMap
73+
metadata:
74+
name: test-pulsar
75+
data:
76+
webServiceURL: http://test-pulsar-broker.default.svc.cluster.local:8080
77+
brokerServiceURL: pulsar://test-pulsar-broker.default.svc.cluster.local:6650
78+
#---
79+
#apiVersion: v1
80+
#kind: ConfigMap
81+
#metadata:
82+
# name: test-auth
83+
#data:
84+
# clientAuthenticationPlugin: "abc"
85+
# clientAuthenticationParameters: "xyz"
86+
# tlsTrustCertsFilePath: "uvw"
87+
# useTls: "true"
88+
# tlsAllowInsecureConnection: "false"
89+
# tlsHostnameVerificationEnable: "true"
90+
---
91+
apiVersion: v1
92+
data:
93+
username: YWRtaW4=
94+
password: MWYyZDFlMmU2N2Rm
95+
kind: Secret
96+
metadata:
97+
name: test-secret
98+
type: Opaque

controllers/function_controller_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@ var _ = Describe("Function Controller (E2E)", func() {
7070
})
7171
})
7272

73+
var _ = Describe("Function Controller (Batcher)", func() {
74+
Context("Function With Batcher Item", func() {
75+
configs := makeSamplePulsarConfig()
76+
function := makeFunctionSampleWithKeyBasedBatcher()
77+
78+
createFunctionConfigMap(configs)
79+
createFunction(function)
80+
deleteFunction(function)
81+
deleteFunctionConfigMap(configs)
82+
})
83+
})
84+
7385
func createFunction(function *v1alpha1.Function) {
7486
if function.Status.Conditions == nil {
7587
function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)

controllers/spec/utils.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func generateFunctionOutputSpec(function *v1alpha1.Function) *proto.SinkSpec {
151151
MaxPendingMessagesAcrossPartitions: function.Spec.Output.ProducerConf.MaxPendingMessagesAcrossPartitions,
152152
UseThreadLocalProducers: function.Spec.Output.ProducerConf.UseThreadLocalProducers,
153153
CryptoSpec: generateCryptoSpec(function.Spec.Output.ProducerConf.CryptoConfig),
154+
BatchBuilder: function.Spec.Output.ProducerConf.BatchBuilder,
154155
}
155156

156157
sinkSpec.ProducerSpec = producerConfig
@@ -201,6 +202,7 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec {
201202
MaxPendingMessagesAcrossPartitions: source.Spec.Output.ProducerConf.MaxPendingMessagesAcrossPartitions,
202203
UseThreadLocalProducers: source.Spec.Output.ProducerConf.UseThreadLocalProducers,
203204
CryptoSpec: cryptoSpec,
205+
BatchBuilder: source.Spec.Output.ProducerConf.BatchBuilder,
204206
}
205207
}
206208
return &proto.SinkSpec{

controllers/test_utils_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,17 @@ func makeFunctionSampleWithCryptoEnabled() *v1alpha1.Function {
136136
return function
137137
}
138138

139+
func makeFunctionSampleWithKeyBasedBatcher() *v1alpha1.Function {
140+
function := makeFunctionSample(TestFunctionName)
141+
function.Spec.Output = v1alpha1.OutputConf{
142+
Topic: "persistent://public/default/java-function-output-topic",
143+
ProducerConf: &v1alpha1.ProducerConfig{
144+
BatchBuilder: "KEY_BASED",
145+
},
146+
}
147+
return function
148+
}
149+
139150
func makeFunctionMeshSample() *v1alpha1.FunctionMesh {
140151
inputTopic := "persistent://public/default/functionmesh-input-topic"
141152
middleTopic := "persistent://public/default/mid-topic"

0 commit comments

Comments
 (0)