Skip to content

feat: align streaming architecture with v2 (memory-first persistence)#347

Open
chengcheng84 wants to merge 12 commits intoCherryHQ:mainfrom
chengcheng84:feat/streaming-memory-update
Open

feat: align streaming architecture with v2 (memory-first persistence)#347
chengcheng84 wants to merge 12 commits intoCherryHQ:mainfrom
chengcheng84:feat/streaming-memory-update

Conversation

@chengcheng84
Copy link
Copy Markdown
Contributor

@chengcheng84 chengcheng84 commented Mar 7, 2026

总结

将流式处理从实时数据库写入改为两段式落库

修改细节

  • 用户消息:发送后就立即写入 SQLite,见 src/services/MessagesService.ts#L138
  • assistant 占位消息:开始生成前也会先创建一条 pending 的空消息到 SQLite,见src/services/MessagesService.ts#L148-L150
  • 流式生成中的增量内容:不会边吐 token 边写 SQLite;这阶段只在内存/cache 里更新,见 src/services/messageStreaming/BlockManager.ts#L119、src/services/messageStreaming/BlockManager.ts#L96、src/services/messageStreaming/StreamingService.ts#L228
  • 生成完成/报错/暂停时:才通过 finalize() 一次性写入,把最终 data.blocksstatusstats 等写回 SQLite,见 src/services/messageStreaming/StreamingService.ts#L158-L160src/services/messageStreaming/callbacks/baseCallbacks.tssrc/services/messageStreaming/callbacks/baseCallbacks.ts#L217

流式输出架构重构

Before

throttledBlockUpdate(blockId, changes) 

Now

streamingService.updateBlock(blockId, changes)
streamingService.finalize(messageId, status) 

Question

  • SQLite 里消息内容存在 message.data 这个 JSON 字段里,不是单独 message_blocks 表,见 src/main/
    data/db/schemas/message.ts:16。

这个是要之后改吧,还是这个PR直接改

一个直接后果

  • 如果应用在流式中途崩掉,SQLite 里通常会留下:
    • 用户消息
    • 一条 pending 的 assistant 记录
  • 但中途已经显示出来的那部分 assistant 文本,可能还没进 SQLite,因为还没走 finalize()。

这个是不考虑的吗

Deepseek的diff总结

根据你提供的 git diff 文件内容,这个 PR 涉及一次大规模的重构和优化,主要集中在消息流式处理架构的重构引入缓存服务以及UI 交互的优化

核心变更总结

本次重构的核心目标是将消息流式处理从实时数据库写入模式,改为内存缓存 + 最终持久化的两阶段提交模式,以提升性能和数据一致性。

主要变更点

1. 架构与核心服务重构

  • 引入 CacheService
    • 新增了一个内存缓存服务 (src/services/CacheService.ts),作为基础设施。
    • 它提供了基于 Map 的键值存储、TTL(生存时间)过期、自动垃圾回收以及订阅/通知机制
    • 这为 StreamingService 提供了内存存储支持,并允许UI组件实时响应缓存变化。
  • 引入 StreamingService
    • 这是一个全新的服务 (src/services/messageStreaming/StreamingService.ts),专门管理消息流式生成的生命周期。
    • 两阶段提交:流式过程中,消息和块数据仅存储在 CacheService 的内存中;流式结束时,通过 finalize() 方法一次性批量写入 SQLite 数据库。
    • 它维护了 blockIdmessageId 的映射,并提供了 subscribeToMessage 方法供UI层监听消息状态变化。

2. 业务逻辑层重构

  • 简化 MessagesService.ts
    • 删除了大量用于管理数据库批量更新的复杂代码(如 pendingBlockUpdatesflushTimer 等节流逻辑)。
    • 创建助手消息时,现在会调用 streamingService.startTask() 启动一个流式任务。
    • 简化了 fetchAndProcessAssistantResponseImplfetchTranslateThunk 函数,移除了对旧版数据库保存函数的依赖。
  • 精简 BlockManager
    • BlockManager 的功能被简化,不再直接操作数据库或 Redux。
    • 它现在主要负责跟踪当前活跃的块类型 (_activeBlockInfo),并通过依赖注入的 throttledBlockUpdatecancelThrottledBlockUpdate 方法,间接调用 streamingService 来更新内存中的块数据。
    • 移除了之前所有直接调用 messageBlockDatabase 的代码。

3. 回调处理器更新

  • 所有回调模块(baseCallbackstextCallbacksthinkingCallbacks 等)
    • 移除了对旧版 saveUpdatesToDB 和数据库直接操作的依赖。
    • 全部改为通过 streamingServiceupdateBlockupdateMessageaddBlock 等方法来更新内存状态。
    • onErroronComplete 等生命周期回调现在通过调用 streamingService.finalize() 来触发最终的数据库持久化。

4. UI/数据层适配

  • 新增 useCache Hook
    • 提供了一个 React Hook (src/hooks/useCache.ts),用于让UI组件订阅 CacheService 中特定键的变化,实现实时更新。
  • 优化 useMessagesuseMessageBlocks
    • 这两个 Hook 现在不仅从 SQLite(通过 useLiveQuery)读取数据,还会通过 streamingServicecacheService 订阅流式消息的变化。
    • 它们会合并数据库中的持久化数据与内存缓存中的流式数据,确保UI能实时展示流式内容,同时在流式结束后无缝过渡到数据库数据。
  • 消息渲染优化
    • MessageBlockRenderer 组件现在会检查消息是否正在流式处理。如果是,则优先从 streamingService 获取最新的块数据并渲染,从而实现了实时更新。

5. 其他小优化

  • 日志级别调整:将部分 logger.infologger.silly 调整为 logger.debug,减少了生产环境的日志输出。
  • 代码清理:移除了大量的注释代码和不再使用的导入。
  • UI微调:调整了部分 UI 组件的样式和性能(如移除了不必要的 memo,调整了 recycleItems 行为)。

总结

这是一个基础性、高价值的重构 PR。它将消息流式处理的架构从“实时写库”模式转变为“内存缓存 + 最终一致性”模式。这带来了以下好处:

  1. 性能提升:减少了流式过程中的高频数据库写入操作,降低了 I/O 压力。
  2. 数据一致性finalize 机制确保了数据要么全部成功写入,要么失败回滚(或由 TTL 清理),避免了部分数据落库导致的脏数据问题。
  3. 架构清晰:引入了 CacheService 作为专门的内存管理层,StreamingService 作为业务编排层,职责划分更明确。
  4. 实时性更好:UI 通过订阅机制直接响应内存中的变化,流式渲染更加流畅。
graph TB
    subgraph Input[输入层]
        A[流式Chunk] --> B[BlockManager]
    end

    subgraph Memory[内存层]
        B --> C[StreamingService]
        C --> D[CacheService<br/>TTL+订阅]
    end

    subgraph Storage[存储层]
        C -->|finalize| E[(SQLite)]
    end

    subgraph UI[UI层]
        D -.->|订阅| F[useMessages]
        D -.->|订阅| G[useMessageBlocks]
        E -->|useLiveQuery| F
        E -->|useLiveQuery| G
        F --> H[实时渲染]
        G --> H
    end

    style Memory fill:#e1f5ff
    style Storage fill:#fff4e1
    
Loading

…esService

Other files include fully copying the V2 design and making some necessary fixes
…e subscription mechanism

- Add cache service subscription feature to support real-time monitoring of message and block changes
- Refactor message components by removing React.memo optimization, relying instead on streaming updates
- Implement real-time merging and display of messages and blocks, supporting synchronization between database and streaming data
- Add message state change listeners to ensure UI responds promptly to streaming updates
- Optimize message grouping and rendering logic to improve streaming message processing performance
When the thinking details page is visible, subscribe to the message stream service to update the thinking block status in real time. Remove unnecessary global state comments.
@eeee0717
Copy link
Copy Markdown
Collaborator

还是有点问题,这个里面东西有点复杂,还是我后面找时间弄吧

@chengcheng84
Copy link
Copy Markdown
Contributor Author

OK

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants