11---
22Description : " "
3- date : " 2025-01-20 "
3+ date : " 2025-01-23 "
44lastmod : " "
55tags : []
66title : Eino 流式编程要点
@@ -12,7 +12,7 @@ weight: 0
1212
1313## 编排流式概述
1414
15- ![ ] ( /img/eino/eino_component_runnable.png )
15+ < a href = " /img/eino/eino_component_runnable.png " target = " _blank " >< img src = " /img/eino/eino_component_runnable.png " /></ a >
1616
1717编排流式的 Graph 时,需要考虑的几个关键要素:
1818
@@ -41,9 +41,9 @@ Eino 是个 "component first" 的框架,组件可以独立使用。定组件
4141
4242``` go
4343type ChatModel interface {
44- Generate (ctx context.Context **, ** input []*schema.Message **, ** opts ...Option ) (*schema.Message **, ** error )
45- Stream (ctx context.Context **, ** input []*schema.Message **, ** opts ...Option ) (
46- *schema.StreamReader [*schema.Message ]**, ** error )
44+ Generate (ctx context.Context , input []*schema.Message , opts ...Option ) (*schema.Message , error )
45+ Stream (ctx context.Context , input []*schema.Message , opts ...Option ) (
46+ *schema.StreamReader [*schema.Message ], error )
4747 // other methods omitted...
4848}
4949```
@@ -52,7 +52,7 @@ type ChatModel interface {
5252
5353``` go
5454type Retriever interface {
55- Retrieve (ctx context.Context **, ** query string **, ** opts ...Option ) ([]*schema.Document **, ** error )
55+ Retrieve (ctx context.Context , query string , opts ...Option ) ([]*schema.Document , error )
5656}
5757```
5858
@@ -93,11 +93,11 @@ Collect 和 Transform 两种流式范式,目前只在编排场景有用到。
9393
9494但是,一个组件,一旦处在多个组件组合使用的“编排”场景中,它的入参和出参就没那么固定了,而是取决于这个组件在编排场景中的“上游输出”和“下游输入”。比如 React Agent 的典型编排示意图:
9595
96- ![ ] ( /img/eino/chatmodel_to_tool.png )
96+ < a href = " /img/eino/chatmodel_to_tool.png " target = " _blank " >< img src = " /img/eino/chatmodel_to_tool.png " /></ a >
9797
9898上图中,如果 Tool 是个 StreamableTool,也就是输出是 StreamReader[ Message] ,则 Tool -> ChatModel 就可能是流式的输出。但是 Chat Model 并没有接收流式输入的业务场景,也没有对应的接口。这时 Eino 框架会自动帮助 ChatModel 补足接收流式输入的能力:
9999
100- ![ ] ( /img/eino/chatmodel_tool_loop.png )
100+ < a href = " /img/eino/chatmodel_tool_loop.png " target = " _blank " >< img src = " /img/eino/chatmodel_tool_loop.png " /></ a >
101101
102102上面的 Concat message stream 是 Eino 框架自动提供的能力,即使不是 message,是任意的 T,只要满足特定的条件,Eino 框架都会自动去做这个 StreamReader[ T] 到 T 的转化,这个条件是:** 在编排中,当一个组件的上游输出是 StreamReader[ T] ,但是组件只提供了 T 作为输入的业务接口时,框架会自动将 StreamReader[ T] concat 成 T,再输入给这个组件。**
103103
@@ -106,23 +106,23 @@ Collect 和 Transform 两种流式范式,目前只在编排场景有用到。
106106
107107另一方面,考虑一个相反的例子。还是 React Agent,这次是一个更完整的编排示意图:
108108
109- ![ ] ( /img/eino/tool_model_react.png )
109+ < a href = " /img/eino/tool_model_react.png " target = " _blank " >< img src = " /img/eino/tool_model_react.png " /></ a >
110110
111111在上图中,branch 接收 chat model 输出的 message,并根据 message 中是否包含 tool call,来选择直接结束 agent 本次运行并将 message 输出,还是调用 Tool 并将调用结果再次给 Chat Model 循环处理。由于这个 Branch 可以通过 message stream 的首个帧就完成逻辑判断,因此我们给这个 Branch 定义的是 Collect 接口,即流式输入,非流式输出:
112112
113113``` go
114- compose.NewStreamGraphBranch (func (ctx context .Context **, ** sr *schema .StreamReader [*schema .Message ]) (endNode string **, ** err error ) {
115- msg**, ** err := sr.Recv ()
114+ compose.NewStreamGraphBranch (func (ctx context .Context , sr *schema .StreamReader [*schema .Message ]) (endNode string , err error ) {
115+ msg , err := sr.Recv ()
116116 if err != nil {
117- return " " **, ** err
117+ return " " , err
118118 }
119119 defer sr.Close ()
120120
121- if len (msg.ToolCalls ) == ** 0 ** {
122- return compose._END_ **, ** nil
121+ if len (msg.ToolCalls ) == 0 {
122+ return compose._END_ , nil
123123 }
124124
125- return nodeKeyTools**, ** nil
125+ return nodeKeyTools, nil
126126}
127127```
128128
@@ -165,11 +165,11 @@ ReactAgent 有两个接口,Generate 和 Stream,分别实现了 Invoke 和 St
165165- “没有”:整体而言,Graph,Chain 等编排产物,自身是没有业务属性的,只为抽象的编排服务的,因此也就没有符合业务场景的接口范式。同时,编排需要支持各种范式的业务场景。所以,Eino 中代表编排产物的 Runnable[ I, O] 接口,不做选择也无法选择,提供了所有流式范式的方法:
166166
167167``` go
168- type Runnable [I**, ** O any] interface {
169- Invoke (ctx context.Context **, ** input I **, ** opts ...Option ) (output O**, ** err error )
170- Stream (ctx context.Context **, ** input I **, ** opts ...Option ) (output *schema.StreamReader [O]**, ** err error )
171- Collect (ctx context.Context **, ** input *schema.StreamReader [I]**, ** opts ...Option ) (output O**, ** err error )
172- Transform (ctx context.Context **, ** input *schema.StreamReader [I]**, ** opts ...Option ) (output *schema.StreamReader [O]**, ** err error )
168+ type Runnable [I, O any] interface {
169+ Invoke (ctx context.Context , input I , opts ...Option ) (output O, err error )
170+ Stream (ctx context.Context , input I , opts ...Option ) (output *schema.StreamReader [O], err error )
171+ Collect (ctx context.Context , input *schema.StreamReader [I], opts ...Option ) (output O, err error )
172+ Transform (ctx context.Context , input *schema.StreamReader [I], opts ...Option ) (output *schema.StreamReader [O], err error )
173173}
174174```
175175
@@ -179,27 +179,33 @@ type Runnable[I**, **O any] interface {
179179
180180从另一个角度看,既然编排产物整体可以被看做“组件”,那“组件”必然有自己的内部实现,比如 ChatModel 的内部实现逻辑,可能是把入参的 [ ] Message 转化成各个模型的 API request,之后调用模型的 API,获取 response 后再转化成出参的 Message。那么类比的话,Graph 这个“组件”的内部实现是什么?是数据在 Graph 内部各个组件间以用户指定的流转方向和流式范式来流转。其中,“流转方向”不在当前讨论范围内,而各组件运行时的流式范式,则由 Graph 整体的触发方式决定,具体来说:
181181
182- - 如果用户通过 Invoke 来调用 Graph,则 Graph 内部所有组件都以 Invoke 范式来调用。如果某个组件,没有实现 Invoke 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Invoke 调用范式,优先顺位如下:
183- - 若组件实现了 Stream,则通过 Stream 封装 Invoke,即自动 concat 输出流。
182+ 如果用户通过 ** Invoke** 来调用 Graph,则 Graph 内部所有组件都以 Invoke 范式来调用。如果某个组件,没有实现 Invoke 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Invoke 调用范式,优先顺位如下:
183+
184+ - 若组件实现了 Stream,则通过 Stream 封装 Invoke,即自动 concat 输出流。
185+
186+ <a href =" /img/eino/invoke_outside_stream_inside.png " target =" _blank " ><img src =" /img/eino/invoke_outside_stream_inside.png " /></a >
184187
185- ![ ] ( /img/eino/invoke_outside_stream_inside.png )
186188- 否则,若组件实现了 Collect,则通过 Collect 封装 Invoke,即非流式入参转单帧流。
187189
188- ![ ] ( /img/eino/invoke_outside_collect_inside.png )
190+ <a href =" /img/eino/invoke_outside_collect_inside.png " target =" _blank " ><img src =" /img/eino/invoke_outside_collect_inside.png " /></a >
191+
189192- 如果都没实现,则必须实现 Transform,通过 Transform 封装 Invoke,即入参转单帧流,出参 concat。
190193
191- ![ ] ( /img/eino/invoke_outside_transform_inside.png )
194+ < a href = " /img/eino/invoke_outside_transform_inside.png " target = " _blank " >< img src = " /img/eino/invoke_outside_transform_inside.png " /></ a >
192195
193- - 如果用户通过 Stream/Collect/Transform 来调用 Graph,则 Graph 内部所有组件都以 Transform 范式来调用。如果某个组件,没有实现 Transform 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Transform 调用范式,优先顺位如下:
194- - 若组件实现了 Stream,则通过 Stream 封装 Transform,即自动 concat 输入流。
196+ 如果用户通过 ** Stream/Collect/Transform** 来调用 Graph,则 Graph 内部所有组件都以 Transform 范式来调用。如果某个组件,没有实现 Transform 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Transform 调用范式,优先顺位如下:
197+
198+ - 若组件实现了 Stream,则通过 Stream 封装 Transform,即自动 concat 输入流。
199+
200+ <a href =" /img/eino/transform_inside_stream_inside.png " target =" _blank " ><img src =" /img/eino/transform_inside_stream_inside.png " /></a >
195201
196- ![ ] ( /img/eino/transform_inside_stream_inside.png )
197202- 否则,若组件实现了 Collect,则通过 Collect 封装 Transform,即非流式出参转单帧流。
198203
199- ![ ] ( /img/eino/transform_outside_stream_inside.png )
204+ <a href =" /img/eino/transform_outside_stream_inside.png " target =" _blank " ><img src =" /img/eino/transform_outside_stream_inside.png " /></a >
205+
200206- 如果都没实现,则必须实现 Invoke,通过 Invoke 封装 Transform,即入参流 concat,出参转单帧流
201207
202- ![ ] ( /img/eino/transform_outside_invoke_inside.png )
208+ < a href = " /img/eino/transform_outside_invoke_inside.png " target = " _blank " >< img src = " /img/eino/transform_outside_invoke_inside.png " /></ a >
203209
204210结合上面穷举的各种案例,Eino 框架对 T 和 Stream[ T] 的自动转换,可以总结为:
205211
0 commit comments