Skip to content

Commit 535b141

Browse files
committed
add ai agent for TIS
1 parent 1c26b12 commit 535b141

File tree

6 files changed

+662
-28
lines changed

6 files changed

+662
-28
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# 需求说明
2+
当TIS AI agent部署实时增量通道过程中,需要使用Standalone模式部署实时增量通道过程中,发现本地环境中还没有可用Standalone Flink Cluster
3+
,需要在Agent执行流程执行自动部署Standalone模式的Flink Cluster
4+
5+
# 执行流程说明
6+
7+
部署自动部署Standalone模式的Flink Cluster分为以下几个步骤
8+
1. 下载flink Standalone压缩包,
9+
2. 解压下载的flink Standalone压缩包,(是否可以让下载和解压直接在内存中实现?不需要将压缩包在磁盘中临时保存),下载前需要校验StandaloneFlinkDeployingAIAssistSupport.this.flinkDeployDir指定的目录是否存在及是否有权限
10+
3. 修改Flink的配置文件(flink-tis-1.20.1/conf/config.yaml,本地环境中我已经存放了一份,路径在/Users/mozhenghua/Downloads/flink-tis-1.20.1-bin (1)/conf/config.yaml),修改环境变量,需要修改的环境变量如下:
11+
1. 在jobmanager和taskmanager的启动java option配置中添加: -Ddata.dir=StandaloneFlinkDeployingAIAssistSupport.this.dataDir, "-D"+CenterResource.KEY_notFetchFromCenterRepository=true
12+
2. 修改taskmanager的启动`numberOfTaskSlots` 配置项为StandaloneFlinkDeployingAIAssistSupport.this.slot
13+
3. 修改taskmanager的启动的内存大小设置为,StandaloneFlinkDeployingAIAssistSupport.this.tmMemory ,需要加一个校验,当前机器节点是否有足够的内存,需要防止Flink 启动之后内存不足
14+
4. 修改Flink 暴露的的REST API Http 端口由默认的8081端口,设置成StandaloneFlinkDeployingAIAssistSupport.this.port
15+
16+
修改Flink的配置文件,需要实现实现即使多次重复操作最终文件内容也是相同的(实现幂等)
17+
4. 调用 Flink的启动脚本(flink-tis-1.20.1/bin/start-cluster.sh,路径在:/Users/mozhenghua/Downloads/flink-tis-1.20.1-bin (1)/bin/start-cluster.sh)启动Flink,如果不能正常启动需要报告错误信息
18+
19+
# 代码入口说明
20+
21+
已经写了一个骨架代码 StandaloneFlinkDeployingAIAssistSupport(路径为:tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/launch/clustertype/StandaloneFlinkDeployingAIAssistSupport.java)
22+
的startProcess() 函数

tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/common/FlinkCluster.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import com.qlangtech.tis.extension.util.OverwriteProps;
3131
import com.qlangtech.tis.lang.TisException;
3232
import com.qlangtech.tis.plugin.IEndTypeGetter;
33+
import com.qlangtech.tis.plugin.IPluginStore;
3334
import com.qlangtech.tis.plugin.annotation.FormField;
3435
import com.qlangtech.tis.plugin.annotation.FormFieldType;
3536
import com.qlangtech.tis.plugin.annotation.Validator;
36-
import com.qlangtech.tis.realtime.utils.NetUtils;
3737
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
3838
import org.apache.flink.client.program.ClusterClient;
3939
import org.apache.flink.client.program.rest.RestClusterClient;
@@ -60,6 +60,7 @@ public class FlinkCluster extends ParamsConfig implements IFlinkCluster {
6060
private static final Logger logger = LoggerFactory.getLogger(FlinkCluster.class);
6161

6262
private static final String FLINK_DEFAULT_CLUSTER_ID = "default_cluster_id";
63+
public static final int FLINK_DEFAULT_RETRY_MAX_ATTEMPTS = 1;
6364

6465
public static void main(String[] args) {
6566
System.out.println(IFlinkCluster.class.isAssignableFrom(FlinkCluster.class));
@@ -78,6 +79,20 @@ public static void main(String[] args) {
7879
@FormField(ordinal = 3, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.integer, Validator.require})
7980
public Long retryDelay;
8081

82+
public static IPluginStore<FlinkCluster> getFlinkClusterStore() {
83+
84+
// return getTargetPluginStore(
85+
// UploadPluginMeta.TargetDesc.create(
86+
// UploadPluginMeta.parse( KEY_TARGET_PLUGIN_DESC, false)));
87+
88+
89+
return getTargetPluginStore(IFlinkCluster.KEY_DISPLAY_NAME
90+
// UploadPluginMeta.TargetDesc.create(
91+
// UploadPluginMeta.parse( KEY_TARGET_PLUGIN_DESC, false))
92+
);
93+
94+
}
95+
8196

8297
@Override
8398
public JobManagerAddress getJobManagerAddress() {
@@ -162,7 +177,7 @@ public DefaultDescriptor() {
162177
super(KEY_DISPLAY_NAME);
163178
// this.load();
164179
opts = FlinkPropAssist.createOpts(this);
165-
opts.addFieldDescriptor("maxRetry", RestOptions.RETRY_MAX_ATTEMPTS, OverwriteProps.dft(1));
180+
opts.addFieldDescriptor("maxRetry", RestOptions.RETRY_MAX_ATTEMPTS, OverwriteProps.dft(FLINK_DEFAULT_RETRY_MAX_ATTEMPTS));
166181
opts.addFieldDescriptor("retryDelay", RestOptions.RETRY_DELAY);
167182
}
168183

tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/launch/clustertype/Standalone.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
import com.qlangtech.tis.extension.Descriptor;
3232
import com.qlangtech.tis.extension.TISExtension;
3333
import com.qlangtech.tis.lang.TisException;
34+
import com.qlangtech.tis.plugin.IPluginStore;
3435
import com.qlangtech.tis.plugin.annotation.FormField;
3536
import com.qlangtech.tis.plugin.annotation.FormFieldType;
3637
import com.qlangtech.tis.plugin.annotation.Validator;
3738
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
3839
import com.qlangtech.tis.util.DescribableJSON;
40+
import org.apache.commons.lang.StringUtils;
3941
import org.apache.flink.client.program.ClusterClient;
4042

43+
import java.util.Objects;
4144
import java.util.Optional;
4245

4346
/**
@@ -106,9 +109,17 @@ protected boolean validateAll(IControlMsgHandler msgHandler, Context context, Po
106109

107110
@Override
108111
public Optional<DescribableJSON<ParamsConfig>> getAIAssistSupport() {
112+
IPluginStore<StandaloneFlinkDeployingAIAssistSupport> aiAssistSupport
113+
= StandaloneFlinkDeployingAIAssistSupport.getTargetPluginStore(StandaloneFlinkDeployingAIAssistSupport.KEY_IDENTITY_NAME);
109114
StandaloneFlinkDeployingAIAssistSupport flinkDeployingAIAssistSupport = new StandaloneFlinkDeployingAIAssistSupport();
115+
for (StandaloneFlinkDeployingAIAssistSupport assistSupport : aiAssistSupport.getPlugins()) {
116+
if (StringUtils.equals(assistSupport.name, StandaloneFlinkDeployingAIAssistSupport.KEY_IDENTITY_NAME)) {
117+
flinkDeployingAIAssistSupport = assistSupport;
118+
}
119+
}
110120
DescribableJSON<ParamsConfig> result = new DescribableJSON<>(
111-
flinkDeployingAIAssistSupport, new StandaloneFlinkDeployingAIAssistSupport.DefaultDesc());
121+
Objects.requireNonNull(flinkDeployingAIAssistSupport, "flinkDeployingAIAssistSupport can not be null")
122+
, new StandaloneFlinkDeployingAIAssistSupport.DefaultDesc());
112123
return Optional.of(result);
113124
}
114125
}

0 commit comments

Comments
 (0)