Skip to content

Commit fad8e44

Browse files
committed
modfiy for powerjob batch sync
1 parent adb3bec commit fad8e44

File tree

30 files changed

+432
-1599
lines changed

30 files changed

+432
-1599
lines changed

tis-datax/executor/powerjob-worker-samples/docker-build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
tis_version="4.2.0"
1+
tis_version="4.3.0-SNAPSHOT"
22
docker rmi registry.cn-hangzhou.aliyuncs.com/tis/tis-datax-executor:$tis_version
33
docker rmi tis/tis-datax-executor:$tis_version
44
docker build . -t tis/tis-datax-executor:$tis_version
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
在 PowerJob 中,与 DAG 调度触发相关的核心代码主要集中在 **服务端(powerjob-server)** 模块,以下是关键的核心类及其职责:
2+
3+
---
4+
5+
### 1. **工作流调度引擎入口**
6+
**`WorkflowInstanceManager`**
7+
- 路径:`tech.powerjob.server.core.workflow.algorithm.WorkflowInstanceManager`
8+
- 核心职责:
9+
- 工作流实例的生命周期管理(创建、启动、停止)
10+
- 触发工作流实例的初始调度
11+
- 关键方法:
12+
```java
13+
// 创建并启动工作流实例
14+
public void createWorkflowInstance(WorkflowInstanceCreateReq req) {
15+
// 1. 持久化工作流实例
16+
// 2. 提交到调度队列 (workflowReadyQueue)
17+
}
18+
```
19+
20+
---
21+
22+
### 2. **DAG 调度核心处理器**
23+
**`WorkflowExecutor`**
24+
- 路径:`tech.powerjob.server.core.workflow.algorithm.WorkflowExecutor`
25+
- 核心职责:
26+
- **DAG 状态机驱动**:推进工作流实例的状态流转
27+
- **任务节点调度**:根据 DAG 依赖关系触发任务执行
28+
- **容错处理**:失败重试、超时处理等
29+
- 关键逻辑:
30+
```java
31+
public void executeWorkflowInstance(Long wfInstanceId) {
32+
// 1. 加载DAG拓扑结构
33+
WorkflowDAG dag = workflowService.fetchWorkflowDAG(wfInstanceId);
34+
35+
// 2. 计算可运行节点
36+
List<Node> runnableNodes = DAGUtils.calculateRunnableNodes(dag);
37+
38+
// 3. 提交任务到任务调度器
39+
for (Node node : runnableNodes) {
40+
taskDispatcher.dispatch(node.toTaskInstance(), node.getNodeParams());
41+
}
42+
}
43+
```
44+
45+
---
46+
47+
### 3. **DAG 图遍历与状态计算**
48+
**`DAGTraverser`**
49+
- 路径:`tech.powerjob.server.core.workflow.algorithm.DAGTraverser`
50+
- 核心职责:
51+
- **依赖解析**:基于 DAG 图计算可执行节点
52+
- **状态传播**:处理节点完成/失败后的状态传递
53+
- 关键算法:
54+
```java
55+
public static List<Node> calculateRunnableNodes(WorkflowDAG dag) {
56+
// 实现拓扑排序算法
57+
return topologicalSort(dag)
58+
.stream()
59+
.filter(node -> isRunnable(node, dag)) // 检查前置依赖是否完成
60+
.collect(Collectors.toList());
61+
}
62+
```
63+
64+
---
65+
66+
### 4. **任务触发执行器**
67+
**`TaskDispatcher`**
68+
- 路径:`tech.powerjob.server.core.scheduler.TaskDispatcher`
69+
- 核心职责:
70+
- 将 DAG 节点转化为具体任务实例
71+
- 提交任务到工作线程池执行
72+
- 关键代码:
73+
```java
74+
public void dispatch(TaskInstance task, Map<String, String> params) {
75+
// 1. 创建任务实例记录
76+
taskInstanceService.save(task);
77+
78+
// 2. 提交到执行器
79+
workerClusterManager.dispatchTask(
80+
task.getInstanceId(),
81+
task.getTaskId(),
82+
params
83+
);
84+
}
85+
```
86+
87+
---
88+
89+
### 5. **回调处理器(状态推进引擎)**
90+
**`WorkflowInstanceInfoService`**
91+
- 路径:`tech.powerjob.server.core.service.WorkflowInstanceInfoService`
92+
- 核心职责:
93+
- 监听**任务完成事件**
94+
- 触发工作流状态更新和后续节点调度
95+
- 关键逻辑:
96+
```java
97+
@EventListener
98+
public void onTaskInstanceStatusChange(TaskInstanceStatusChangeEvent event) {
99+
if (event.getStatus() == TaskResult.COMPLETED || event.getStatus() == TaskResult.FAILED) {
100+
// 更新DAG节点状态
101+
workflowService.updateNodeStatus(event.getTaskId(), event.getStatus());
102+
103+
// 重新调度工作流实例
104+
workflowInstanceManager.resumeWorkflowInstance(
105+
event.getWfInstanceId()
106+
);
107+
}
108+
}
109+
```
110+
111+
---
112+
113+
### 6. **调度器入口(定时触发)**
114+
**`WorkflowScheduleService`**
115+
- 路径:`tech.powerjob.server.core.scheduler.WorkflowScheduleService`
116+
- 核心职责:
117+
- 扫描需要调度的**定时工作流**
118+
- 触发工作流实例创建
119+
- 调度逻辑:
120+
```java
121+
@Scheduled(fixedRate = 10_000)
122+
public void scheduleTimedWorkflow() {
123+
List<WorkflowInfo> timedWorkflows = workflowService.fetchTimedWorkflow();
124+
for (WorkflowInfo wf : timedWorkflows) {
125+
if (shouldSchedule(wf)) {
126+
workflowInstanceManager.createWorkflowInstance(
127+
new WorkflowInstanceCreateReq(wf.getId())
128+
);
129+
}
130+
}
131+
}
132+
```
133+
134+
---
135+
136+
### 核心执行流程
137+
```mermaid
138+
sequenceDiagram
139+
participant ScheduleService
140+
participant InstanceManager
141+
participant WorkflowExecutor
142+
participant DAGTraverser
143+
participant TaskDispatcher
144+
145+
ScheduleService->>InstanceManager: 定时触发创建实例
146+
InstanceManager->>WorkflowExecutor: 提交实例
147+
WorkflowExecutor->>DAGTraverser: 请求可运行节点
148+
DAGTraverser-->>WorkflowExecutor: 返回节点列表
149+
WorkflowExecutor->>TaskDispatcher: 分发任务
150+
TaskDispatcher->>Worker: 执行任务
151+
Worker-->>InstanceManager: 任务完成事件
152+
InstanceManager->>WorkflowExecutor: 触发状态更新
153+
loop 直到DAG完成
154+
WorkflowExecutor->>DAGTraverser: 重新计算节点
155+
DAGTraverser-->>WorkflowExecutor: 新可运行节点
156+
WorkflowExecutor->>TaskDispatcher: 分发新任务
157+
end
158+
```
159+
160+
---
161+
162+
### 关键设计特点
163+
1. **事件驱动架构**:通过任务完成事件驱动 DAG 状态流转
164+
2. **拓扑排序算法**:使用 Kahn 算法实现高效依赖解析
165+
3. **异步执行模型**:所有调度操作异步化,避免阻塞主线程
166+
4. **状态机管理**:明确定义工作流实例的 6 种状态(WAITING/RUNNING/SUCCEED/FAILED/STOPPED)
167+
168+
建议重点关注以下包路径的源码:
169+
`powerjob-server/src/main/java/tech/powerjob/server/core/workflow/algorithm/`
170+
`powerjob-server/src/main/java/tech/powerjob/server/core/scheduler/`
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
好的,我为您梳理了 PowerJob 中与 Akka 相关的核心代码实现部分。Akka 主要用于 PowerJob 的**分布式通信层**,特别是在服务端(Server)和客户端(Worker)之间的高性能网络通信中。以下是关键代码模块分析:
2+
3+
---
4+
5+
### 核心模块路径
6+
```bash
7+
powerjob-remote/akka-remote/src/main/java/tech/powerjob/remote/framework
8+
```
9+
10+
---
11+
12+
### 核心实现类
13+
14+
#### 1. Akka 初始化与配置
15+
**`AkkaRemoteEngine.java`**
16+
- 位置:`tech.powerjob.remote.framework.engine.impl.AkkaRemoteEngine`
17+
- 核心职责:
18+
- 初始化 Akka ActorSystem
19+
- 加载 Akka 配置(`application.conf`
20+
- 创建根 Actor `ActorSystemHolder`
21+
- 关键代码:
22+
```java
23+
ActorSystem actorSystem = ActorSystem.create("powerjob-akka-system", akkaConfig);
24+
actorSystem.actorOf(Props.create(ActorSystemHolder.class, actorHandler), "holder");
25+
```
26+
27+
#### 2. Actor 系统容器
28+
**`ActorSystemHolder.java`**
29+
- 位置:`tech.powerjob.remote.framework.actor.ActorSystemHolder`
30+
- 核心职责:
31+
- 管理所有业务 Actor 的生命周期
32+
- 作为所有 Actor 的根监督者
33+
- 处理 Actor 的创建/销毁
34+
- 关键方法:
35+
```java
36+
private void initHandlers() {
37+
// 动态创建处理不同请求类型的Actor
38+
handlers.forEach(handlerInfo -> {
39+
String actorName = handlerInfo.getPath();
40+
getContext().actorOf(Props.create(HandlerActor.class, handlerInfo), actorName);
41+
});
42+
}
43+
```
44+
45+
#### 3. 请求处理 Actor
46+
**`HandlerActor.java`**
47+
- 位置:`tech.powerjob.remote.framework.actor.HandlerActor`
48+
- 核心职责:
49+
- 实际处理网络请求的 Akka Actor
50+
- 路由请求到对应的业务处理器
51+
- 实现异步非阻塞处理
52+
- 核心逻辑:
53+
```java
54+
@Override
55+
public Receive createReceive() {
56+
return receiveBuilder()
57+
.match(RemoteReq.class, this::onReceiveRemoteReq)
58+
.build();
59+
}
60+
61+
private void onReceiveRemoteReq(RemoteReq remoteReq) {
62+
// 1. 反序列化请求
63+
// 2. 路由到对应业务处理器
64+
// 3. 异步处理并返回响应
65+
}
66+
```
67+
68+
#### 4. 远程消息协议
69+
**`RemoteProtocol.java`**
70+
- 位置:`tech.powerjob.remote.framework.actor.HandlerActor`
71+
- 核心类:
72+
- `RemoteReq`:封装请求数据
73+
- `RemoteResp`:封装响应数据
74+
- 数据结构:
75+
```java
76+
public class RemoteReq implements Serializable {
77+
private String path; // 请求路径
78+
private byte[] payload; // 二进制负载
79+
private Map<String, String> metadata; // 元数据
80+
}
81+
```
82+
83+
#### 5. 序列化扩展
84+
**`PowerAkkaSerializer.java`**
85+
- 位置:`tech.powerjob.remote.framework.serializer.PowerAkkaSerializer`
86+
- 核心职责:
87+
- 实现 Akka 的 `Serializer` 接口
88+
- 使用 Hessian2 进行高效二进制序列化
89+
- 关键实现:
90+
```java
91+
@Override
92+
public byte[] toBinary(Object object) {
93+
return SerializerUtils.hes.serialize(object);
94+
}
95+
96+
@Override
97+
public Object fromBinary(byte[] bytes, Class<?> manifest) {
98+
return SerializerUtils.hes.deserialize(bytes);
99+
}
100+
```
101+
102+
---
103+
104+
### 配置资源文件
105+
**`application.conf`**
106+
- 路径:`src/main/resources/application.conf`
107+
- 核心配置:
108+
```conf
109+
akka {
110+
actor {
111+
provider = cluster
112+
serializers {
113+
powerjob = "tech.powerjob.remote.framework.serializer.PowerAkkaSerializer"
114+
}
115+
serialization-bindings {
116+
"java.io.Serializable" = powerjob
117+
}
118+
}
119+
remote.artery {
120+
transport = tcp
121+
canonical.hostname = "127.0.0.1"
122+
canonical.port = 10000
123+
}
124+
}
125+
```
126+
127+
---
128+
129+
### 执行流程
130+
1. **初始化阶段**
131+
`AkkaRemoteEngine` → 创建 `ActorSystem` → 初始化 `ActorSystemHolder`
132+
133+
2. **请求接收阶段**
134+
网络请求 → 序列化为 `RemoteReq` → 路由到 `HandlerActor`
135+
136+
3. **业务处理阶段**
137+
`HandlerActor` → 解析请求 → 调用业务处理器 → 生成 `RemoteResp`
138+
139+
4. **响应返回阶段**
140+
序列化响应 → 通过 Akka 网络层返回 → 客户端反序列化
141+
142+
---
143+
144+
### 设计特点
145+
1. **高性能传输**
146+
使用 Artery TCP 传输层,支持百万级 QPS
147+
2. **零拷贝优化**
148+
通过 ByteBuffer 实现二进制零拷贝传输
149+
3. **弹性扩展**
150+
基于 Actor 模型的横向扩展能力
151+
4. **协议无关性**
152+
业务处理器无需感知 Akka 实现细节
153+
154+
---
155+
156+
### 示例调用栈
157+
```mermaid
158+
sequenceDiagram
159+
participant Client
160+
participant ServerActor
161+
participant HandlerActor
162+
participant BusinessHandler
163+
164+
Client->>ServerActor: 发送RemoteReq
165+
ServerActor->>HandlerActor: 转发请求
166+
HandlerActor->>BusinessHandler: 调用process()
167+
BusinessHandler-->>HandlerActor: 返回结果
168+
HandlerActor->>ServerActor: 封装RemoteResp
169+
ServerActor->>Client: 返回响应
170+
```
171+
172+
建议结合源码查看以下关键包路径:
173+
`tech.powerjob.remote.framework.actor`
174+
`tech.powerjob.remote.framework.engine.impl`
175+
`tech.powerjob.remote.framework.serializer`
176+
177+
这些模块共同构成了 PowerJob 基于 Akka 的高性能分布式通信基础架构。

0 commit comments

Comments
 (0)