Appearance
SSE 流式响应实战:基于 WebFlux + Reactor 的大模型流式聊天完整实现
作者: 必码 | bima.cc
前言
在当今大语言模型(LLM)全面普及的技术浪潮中,流式响应已经成为AI聊天应用的标准交互方式。当用户向ChatGPT、Claude或本地部署的Ollama发送一条消息时,看到的不再是"等待中..."的空白页面,而是逐字逐句涌现的智能回复——这种"打字机"般的效果,正是通过SSE(Server-Sent Events)流式响应技术实现的。
对于Java后端开发者而言,实现流式响应面临着一系列技术挑战:传统的Spring MVC基于Servlet容器,采用"请求-响应"的同步阻塞模型,天然不适合处理长时间的流式数据推送;而Spring WebFlux基于Reactor和Netty的响应式编程模型,则为流式场景提供了优雅的解决方案。
本文基于 smart-scaffold-springboot 和 smart-scaffold-dubbo 两个实际开源项目中的AI聊天模块,从协议原理、框架基础、请求发送、响应解析、协议适配、事件构建、前端对接、异常处理到性能优化,完整地解析一条流式聊天消息从发送到渲染的全链路实现。文章中的所有代码示例均来自真实项目,并经过教学化简化处理,确保读者既能理解核心原理,又能直接应用于生产环境。
本文适合的读者群体:
- 具备Java基础,希望掌握响应式编程的进阶开发者
- 正在或即将接入大模型API的后端工程师
- 需要实现SSE流式推送功能的技术架构师
- 对Spring WebFlux和Project Reactor感兴趣的学习者
阅读本文前,建议读者具备以下知识储备:
- Java基础及Spring Boot开发经验
- HTTP协议基本概念
- JSON数据格式
- Maven/Gradle构建工具使用经验
一、SSE协议原理深度解析
1.1 Server-Sent Events规范概述
Server-Sent Events(SSE)是一种基于HTTP协议的服务器推送技术,最早由W3C在HTML5规范中提出,目前已被W3C正式标准化为独立的推荐规范。其核心思想非常简洁:客户端与服务器建立一条持久的HTTP连接,服务器通过这条连接持续向客户端推送数据,直到服务器主动关闭连接或客户端断开。
SSE的设计哲学可以概括为以下几个核心原则:
第一,协议极简。 SSE完全基于HTTP协议,不需要额外的握手过程或协议升级。客户端发送一个普通的HTTP请求,服务器返回的响应头中设置 Content-Type: text/event-stream,此后服务器就可以持续向客户端发送事件数据。这种设计使得SSE几乎可以在任何HTTP基础设施上运行,包括代理服务器、负载均衡器和CDN。
第二,单向通信。 与WebSocket的双向全双工通信不同,SSE是严格的服务器到客户端的单向通道。这个看似"受限"的设计,在AI聊天、实时通知、股票行情推送等场景中恰恰是最合适的——因为这些场景的核心需求就是服务器向客户端推送数据。
第三,自动重连。 SSE规范内置了断线重连机制。当连接意外断开时,浏览器原生支持的EventSource API会自动尝试重新建立连接,并且可以通过 Last-Event-ID 请求头恢复到断开前的位置继续接收数据。
第四,文本协议。 SSE传输的数据是纯文本格式,每个事件由若干个字段组成,字段之间以换行符分隔。这种设计使得SSE数据易于调试和监控,同时也便于与各种日志系统和消息中间件集成。
从协议演进的历史视角来看,SSE的出现填补了HTTP协议在服务器推送领域的一个重要空白。在SSE出现之前,实现服务器推送通常需要借助以下几种方案:
- 轮询(Polling): 客户端定时向服务器发送请求询问是否有新数据。这种方式简单但效率极低,大量无效请求浪费带宽和服务器资源。
- 长轮询(Long Polling): 客户端发送请求后,服务器保持连接不立即返回,直到有新数据或超时才响应。这种方式减少了无效请求,但每次响应后都需要重新建立连接。
- WebSocket: 提供全双工通信能力,但协议复杂度高,需要专门的WebSocket服务器支持,且在某些网络环境下可能被防火墙拦截。
SSE在简洁性和功能性之间取得了精妙的平衡:它比轮询和长轮询更高效,比WebSocket更简单,且天然兼容HTTP生态。
1.2 SSE与WebSocket的全面对比
在实际项目选型中,SSE和WebSocket是最常被拿来比较的两种实时通信技术。为了帮助读者做出正确的技术决策,我们从多个维度进行全面的对比分析。
通信方向
| 维度 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务器→客户端) | 双向(全双工) |
| 服务器推送 | 原生支持 | 支持 |
| 客户端发送 | 不支持(需额外HTTP请求) | 支持 |
| 适用场景 | 通知推送、流式响应 | 聊天室、协同编辑 |
SSE的单向通信特性在大模型流式聊天场景中完全够用。用户发送消息是一次普通的HTTP POST请求,而模型的流式回复则是通过SSE通道推送。两者使用不同的HTTP连接,职责分明。
协议复杂度
| 维度 | SSE | WebSocket |
|---|---|---|
| 协议基础 | HTTP/1.1 | 独立协议(ws://、wss://) |
| 握手过程 | 无需额外握手 | 需要Upgrade握手 |
| 数据格式 | 纯文本 | 文本或二进制 |
| 连接管理 | 自动重连 | 需手动实现 |
| 浏览器API | EventSource | WebSocket |
SSE的协议复杂度远低于WebSocket。SSE不需要协议升级(Upgrade),不需要处理帧(Frame),不需要维护心跳(Ping/Pong)。在smart-scaffold项目中,我们选择SSE作为流式响应方案,正是看中了它的简洁性——更少的代码意味着更少的Bug,更简单的运维意味着更高的可靠性。
浏览器兼容性
| 维度 | SSE | WebSocket |
|---|---|---|
| 主流浏览器 | 全部支持(IE除外) | 全部支持 |
| 移动端浏览器 | iOS Safari 7+、Android Chrome | 全部支持 |
| 代理兼容性 | 完全兼容HTTP代理 | 部分代理不支持 |
| CDN兼容性 | 完全兼容 | 通常不兼容 |
SSE基于标准HTTP协议,因此可以无缝穿越各种HTTP代理、负载均衡器和CDN节点。WebSocket由于使用独立的协议,在某些企业网络环境中可能被防火墙拦截。对于需要通过公网提供服务的AI聊天应用来说,SSE的网络兼容性是一个显著优势。
自动重连
SSE规范内置了自动重连机制,这是它相对于WebSocket的一个重要优势。当SSE连接断开时,浏览器原生的EventSource API会自动尝试重新建立连接,默认重连间隔为3秒。开发者还可以通过服务器发送 retry: 字段来动态调整重连间隔。
WebSocket没有内置的重连机制,开发者需要手动实现心跳检测和断线重连逻辑。虽然社区中有许多WebSocket重连库(如reconnecting-websocket),但这增加了额外的依赖和复杂度。
选型建议
基于以上对比,我们给出以下选型建议:
- 选择SSE的场景: AI流式聊天、实时通知推送、股票行情、日志流、服务器状态监控等以服务器推送为主的场景。
- 选择WebSocket的场景: 多人在线聊天室、协同文档编辑、实时游戏、远程桌面等需要双向实时通信的场景。
- 两者结合的场景: 在一个应用中同时使用SSE处理服务器推送和WebSocket处理双向通信,各取所长。
在smart-scaffold项目中,AI流式聊天模块使用SSE,而如果未来需要实现多人实时对话功能,可以考虑引入WebSocket。两者并不冲突,可以在同一个Spring Boot应用中共存。
1.3 HTTP长连接
SSE流式响应的底层基础是HTTP长连接(HTTP Persistent Connection,也称为HTTP Keep-Alive)。理解HTTP长连接的工作原理,对于掌握SSE的性能特征和调优方向至关重要。
HTTP/1.1的持久连接
在HTTP/1.0时代,每个HTTP请求都需要建立一个新的TCP连接,请求完成后连接立即关闭。这种方式在频繁请求的场景下效率极低,因为TCP连接的建立(三次握手)和释放(四次挥手)都需要消耗时间和网络资源。
HTTP/1.1默认启用了持久连接(Keep-Alive),允许在一个TCP连接上发送多个HTTP请求和响应。SSE正是利用了这一特性:客户端发送一个HTTP请求后,服务器不立即关闭连接,而是保持连接打开,持续发送数据。
在SSE场景中,HTTP长连接的生命周期如下:
客户端 服务器
| |
|--- POST /ai/chat/stream HTTP/1.1 ------>| (1) 建立TCP连接
| Accept: text/event-stream |
| |
|<-- HTTP/1.1 200 OK --------------------| (2) 返回响应头
| Content-Type: text/event-stream |
| Cache-Control: no-cache |
| Connection: keep-alive |
| |
|<-- data: {"content":"你"} -------------| (3) 持续推送数据
| |
|<-- data: {"content":"好"} -------------|
| |
|<-- data: {"content":"!"} -------------|
| |
|<-- data: [DONE] -----------------------| (4) 流结束,关闭连接
| |HTTP/2的多路复用
HTTP/2引入了多路复用(Multiplexing)机制,允许在同一个TCP连接上同时发送多个HTTP请求和响应。这对SSE场景有重要的优化意义:
- 多个SSE流可以共享同一个TCP连接,减少了TCP连接数量。
- HTTP/2的头部压缩(HPACK)减少了每个请求的头部开销。
- HTTP/2的流优先级(Stream Priority)机制允许为SSE流分配更高的优先级。
在Spring Boot应用中,如果使用Netty作为服务器(WebFlux的默认选择),HTTP/2的支持是开箱即用的。只需要在配置中启用即可:
yaml
# application.yml - 教学简化版
server:
http2:
enabled: true连接保活策略
在SSE长连接中,如果服务器长时间不发送数据,中间的代理服务器或防火墙可能会认为连接已超时并主动断开连接。为了避免这种情况,需要实现连接保活策略:
策略一:发送注释事件。 SSE规范允许发送以冒号开头的注释行,这些注释不会被EventSource API解析为事件,但可以保持连接活跃。
: keepalive ping策略二:发送心跳事件。 服务器定期发送一个特殊的心跳事件,客户端收到后忽略即可。
event: heartbeat
data: {"type":"ping"}策略三:设置合理的超时时间。 在Nginx等反向代理中配置较长的超时时间,确保SSE连接不会被代理层提前关闭。
nginx
# nginx.conf - SSE连接超时配置
location /ai/chat/stream {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 300s; # 5分钟超时
}在smart-scaffold项目的实际部署中,我们综合使用了以上三种策略,确保SSE连接在各种网络环境下都能稳定运行。
1.4 text/event-stream MIME类型
text/event-stream 是SSE规范定义的标准MIME类型,它告诉客户端和中间件:这个HTTP响应不是一个普通的文档,而是一个持续的事件流。
响应头配置
一个标准的SSE响应头应该包含以下字段:
http
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no各字段的含义和作用如下:
- Content-Type: text/event-stream:标识响应内容类型为SSE事件流。浏览器看到这个MIME类型后,如果请求是通过
fetch或XMLHttpRequest发起的,会知道需要逐步读取响应体。 - Cache-Control: no-cache:禁止缓存SSE响应。这是必须的,因为SSE是实时数据流,任何缓存都会导致数据延迟或丢失。
- Connection: keep-alive:保持TCP连接不关闭。在HTTP/1.1中这是默认行为,但显式声明可以确保中间代理不会提前关闭连接。
- X-Accel-Buffering: no:这是Nginx特有的响应头,用于禁用Nginx的响应缓冲。如果不设置这个头,Nginx会等到缓冲区满或响应结束后才发送数据,导致SSE的实时性完全丧失。
在Spring WebFlux中,设置SSE响应头非常简单,只需要在Controller方法的 produces 属性中指定即可:
java
@PostMapping(value = "/chat/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
@RequestBody Map<String, Object> requestBody) {
// ...
}Spring框架会自动设置 Content-Type: text/event-stream 和 Cache-Control: no-cache 响应头。但 X-Accel-Buffering: no 需要手动添加,在smart-scaffold项目中,我们在WebClient构建时添加了这个头:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
webClientBuilder.defaultHeader("X-Accel-Buffering", "no");
webClientBuilder.defaultHeader("Cache-Control", "no-cache");
webClientBuilder.defaultHeader("Connection", "keep-alive");MIME类型与内容编码
text/event-stream 的字符集默认为UTF-8。SSE规范要求数据必须使用UTF-8编码,这意味着SSE天然支持中文、日文、韩文等多字节字符,非常适合国际化应用。
需要注意的是,SSE不支持内容压缩(Content-Encoding: gzip)。因为压缩需要缓冲一定量的数据才能达到较好的压缩比,这与SSE的实时推送特性相矛盾。如果中间代理对SSE响应启用了压缩,会导致数据延迟甚至连接超时。
1.5 SSE事件格式详解
SSE事件流由一系列文本行组成,每行代表一个字段。事件之间以空行(两个连续的换行符)分隔。SSE规范定义了四种标准字段:data:、event:、id: 和 retry:。
data字段
data: 字段携带事件的实际数据。一个事件可以包含多个 data: 字段,它们会被合并为一个数据块,字段之间用换行符连接。
data: 第一行数据
data: 第二行数据客户端收到上述事件后,event.data 的值为 "第一行数据\n第二行数据"。
如果 data: 后面没有内容,则表示一个空数据事件:
data:在实际的大模型流式聊天中,data: 字段通常携带JSON格式的数据:
data: {"type":"chunk","content":"你"}
data: {"type":"chunk","content":"好"}
data: {"type":"done"}event字段
event: 字段用于指定事件类型。如果不指定,默认类型为 "message"。
event: message
data: {"type":"chunk","content":"Hello"}
event: done
data: {"type":"done"}
event: error
data: {"type":"error","error":"模型服务不可用"}在前端,不同类型的事件可以通过不同的回调函数处理:
javascript
const eventSource = new EventSource('/ai/chat/stream');
eventSource.addEventListener('message', (event) => {
// 处理消息事件
const data = JSON.parse(event.data);
appendContent(data.content);
});
eventSource.addEventListener('done', (event) => {
// 处理完成事件
finishStreaming();
});
eventSource.addEventListener('error', (event) => {
// 处理错误事件
showError(event.data);
});在smart-scaffold项目中,我们使用了自定义的事件类型来区分消息块、完成信号和错误信息:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
return ServerSentEvent.<String>builder()
.event("message")
.data("{\"type\":\"chunk\",\"content\":\"" + escapeJson(chunk) + "\"}")
.build();
// 完成事件
return ServerSentEvent.<String>builder()
.event("done")
.data("{\"type\":\"done\"}")
.build();
// 错误事件
return ServerSentEvent.<String>builder()
.event("error")
.data(errorJson)
.build();id字段
id: 字段用于为事件分配一个唯一标识符。浏览器会在内部记录最后一次收到的事件ID,当连接断开重连时,会通过 Last-Event-ID 请求头将这个ID发送给服务器,服务器可以据此从断开的位置继续发送数据。
id: 12345
event: message
data: {"content":"Hello"}
id: 12346
event: message
data: {"content":"World"}在AI聊天场景中,id 字段可以用于实现消息的断点续传。例如,当网络中断后重连,服务器可以根据 Last-Event-ID 找到上次发送到的位置,从下一个token开始继续推送,避免重复生成内容。
retry字段
retry: 字段用于告诉客户端在连接断开后,等待多少毫秒再尝试重连。这个值是一个整数,单位为毫秒。
retry: 5000
event: message
data: {"content":"Hello"}上述事件告诉客户端:如果连接断开,请等待5秒后再尝试重连。默认的重连间隔为3秒。
在smart-scaffold项目中,我们通过Spring的 ServerSentEvent.builder() 来设置这些字段,这将在第七章详细讲解。
完整事件示例
以下是一个完整的SSE事件流示例,模拟了大模型流式聊天的全过程:
retry: 3000
event: message
data: {"type":"chunk","content":"你"}
id: 1
event: message
data: {"type":"chunk","content":"好"}
id: 2
event: message
data: {"type":"chunk","content":","}
id: 3
event: message
data: {"type":"chunk","content":"我"}
id: 4
event: message
data: {"type":"chunk","content":"是"}
id: 5
event: message
data: {"type":"chunk","content":"AI"}
id: 6
event: message
data: {"type":"chunk","content":"助手"}
id: 7
event: done
data: {"type":"done"}
id: 8二、Spring WebFlux基础
2.1 spring-boot-starter-webflux引入与配置
Spring WebFlux是Spring Framework 5.0引入的全新响应式Web框架,它基于Project Reactor和Reactive Streams规范,为构建异步非阻塞的Web应用提供了完整的编程模型。
依赖引入
在Spring Boot项目中引入WebFlux非常简单,只需要添加一个starter依赖:
xml
<!-- pom.xml - 教学简化版 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>重要提示: spring-boot-starter-webflux 和 spring-boot-starter-web 不应该同时存在。如果两者同时存在,Spring Boot默认使用Servlet容器(Tomcat),WebFlux的响应式特性将无法生效。在smart-scaffold项目中,我们通过Dubbo consumer模块引入WebFlux依赖,而provider模块保持传统的Spring MVC,两者通过Dubbo RPC进行通信。
自动配置原理
Spring Boot对WebFlux的自动配置类是 WebFluxAutoConfiguration,它会自动完成以下工作:
- 配置Netty作为默认的嵌入式服务器(如果没有检测到Servlet容器)
- 注册
WebClient.BuilderBean - 配置
WebFluxConfigurer用于自定义WebFlux行为 - 设置
ReactiveAdapterRegistry用于适配各种响应式类型
核心配置项
yaml
# application.yml - WebFlux核心配置(教学简化版)
server:
port: 8080
netty:
connection-timeout: 60s # 连接超时时间
idle-timeout: 120s # 空闲超时时间
max-keep-alive-requests: 1000 # 最大Keep-Alive请求数
http2:
enabled: true # 启用HTTP/2
spring:
codec:
max-in-memory-size: 10MB # 最大内存缓冲区大小
webflux:
static-path-pattern: /static/** # 静态资源路径在smart-scaffold项目的Dubbo consumer模块中,WebFlux的配置如下:
yaml
# 来自 smart-scaffold-dubbo consumer application.yml
server:
port: 8080
dubbo:
registry:
address: zookeeper://localhost:2181consumer模块使用WebFlux提供SSE流式接口,同时通过Dubbo RPC调用provider模块的AI服务。这种架构设计将响应式接口层与业务逻辑层解耦,各层可以独立演进。
WebFlux与WebMVC的共存
在某些场景下,可能需要在同一个应用中同时使用WebFlux和WebMVC。Spring Boot支持这种混合模式,但需要注意以下限制:
- WebMVC的Controller返回的
Flux或Mono不会自动以响应式方式处理 - WebFlux的Controller不能使用
HttpServletRequest等Servlet API - 两者共享同一个ApplicationContext,但使用不同的Web服务器
在smart-scaffold项目中,我们采用了更清晰的架构:consumer模块使用WebFlux处理外部HTTP请求(包括SSE),provider模块使用WebMVC处理内部业务逻辑。两者通过Dubbo RPC通信,互不干扰。
2.2 Reactive编程模型
响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。在传统的命令式编程中,开发者通过逐步执行语句来完成任务;而在响应式编程中,开发者定义数据流的处理管道,数据在管道中自动流动,每个处理步骤都是非阻塞的。
核心概念
响应式编程的核心概念包括:
数据流(Data Stream): 数据以序列的形式流动,类似于Java 8的Stream,但关键区别在于响应式流是异步的,且可以在数据到达时立即处理,而不需要等待所有数据就绪。
操作符(Operator): 对数据流进行转换、过滤、组合等操作。Reactor提供了丰富的操作符,如 map、filter、flatMap、concatWith、onErrorResume 等。
背压(Backpressure): 当数据生产者的速度超过消费者的处理能力时,消费者可以通过背压机制通知生产者降低发送速度。这是响应式编程区别于传统回调模式的关键特性。
调度器(Scheduler): 控制操作在哪个线程上执行。Reactor提供了多种调度器,如 Schedulers.boundedElastic()(适合I/O密集型操作)、Schedulers.parallel()(适合CPU密集型操作)等。
响应式编程 vs 传统编程
为了更直观地理解响应式编程的优势,我们通过一个简单的例子来对比传统编程和响应式编程的差异。
传统命令式编程(基于Servlet):
java
// 传统Servlet方式 - 同步阻塞
@PostMapping("/chat")
public String chat(@RequestBody ChatRequest request) {
// 1. 线程阻塞等待HTTP请求到AI服务完成
String response = httpClient.post(aiServiceUrl)
.body(request)
.retrieve()
.bodyToMono(String.class)
.block(); // 阻塞当前线程
// 2. 线程阻塞等待数据库写入完成
chatRepository.save(new ChatRecord(request, response));
return response;
}在这种模式下,每个请求都需要占用一个线程。如果AI服务的响应需要30秒,那么这个线程就会被阻塞30秒。在高并发场景下,线程池很快就会被耗尽,导致新请求被拒绝。
响应式编程(基于WebFlux):
java
// WebFlux方式 - 异步非阻塞
@PostMapping("/chat/stream")
public Flux<ServerSentEvent<String>> chatStream(
@RequestBody ChatRequest request) {
// 1. 异步发送请求到AI服务,不阻塞线程
Flux<String> responseFlux = webClient.post(aiServiceUrl)
.bodyValue(request)
.retrieve()
.bodyToFlux(String.class);
// 2. 异步保存聊天记录(不等待完成)
responseFlux.doOnComplete(() ->
chatRepository.save(new ChatRecord(request, accumulatedResponse))
);
// 3. 返回响应式流,由框架异步推送
return responseFlux.map(content ->
ServerSentEvent.<String>builder()
.event("message")
.data(content)
.build()
);
}在这种模式下,线程在等待AI服务响应时不会被阻塞,而是被释放去处理其他请求。当AI服务的数据到达时,Reactor框架会在适当的线程上继续处理。这意味着少量线程就可以处理大量并发请求。
Reactive Streams规范
Project Reactor实现了Reactive Streams规范,该规范定义了四个核心接口:
- Publisher: 数据发布者,负责产生数据。
Mono和Flux都是Publisher的实现。 - Subscriber: 数据订阅者,负责消费数据。Spring WebFlux的响应式Controller就是隐式的订阅者。
- Subscription: 订阅关系,用于控制数据请求量和取消订阅。
- Processor: 既是发布者又是订阅者,用于转换数据流。
Reactive Streams规范确保了不同响应式库之间的互操作性。例如,Reactor的 Flux 可以与RxJava的 Observable 互相转换。
2.3 Mono vs Flux深入理解
在Project Reactor中,Mono 和 Flux 是两个最核心的类型,它们分别表示包含0或1个元素的数据流和包含0到N个元素的数据流。
Mono:0或1个元素
Mono<T> 表示一个异步的数据容器,它最终会产生0个或1个 T 类型的值,或者产生一个错误。可以将其理解为 Optional<T> 的异步版本。
java
// 创建Mono的几种方式
Mono.empty() // 不产生任何值
Mono.just("Hello") // 产生一个值
Mono.fromCallable(() -> queryDb()) // 从回调创建
Mono.fromFuture(completableFuture) // 从Future创建
Mono.delay(Duration.ofSeconds(1)) // 延迟1秒后完成
// Mono的典型操作
Mono.just("Hello")
.map(String::toUpperCase) // 转换:HELLO
.flatMap(s -> Mono.just(s + "!")) // 扁平化映射:HELLO!
.defaultIfEmpty("default") // 空值默认值
.timeout(Duration.ofSeconds(5)) // 超时控制
.subscribe(System.out::println); // 订阅并消费在AI聊天场景中,Mono 适用于非流式的请求-响应模式。例如,发送一条消息并等待完整的响应:
java
// 非流式聊天 - 返回Mono
public Mono<String> chat(ChatRequest request) {
return webClient.post()
.uri("/v1/chat/completions")
.bodyValue(request)
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(60));
}Flux:0到N个元素
Flux<T> 表示一个异步的数据流,它可以产生0到N个 T 类型的值,以 onComplete 信号结束,或者以 onError 信号终止。可以将其理解为 Stream<T> 的异步版本。
java
// 创建Flux的几种方式
Flux.empty() // 不产生任何值
Flux.just("A", "B", "C") // 产生3个值
Flux.fromIterable(list) // 从集合创建
Flux.interval(Duration.ofMillis(100)) // 每100ms产生一个递增数字
Flux.range(1, 10) // 产生1到10的整数
// Flux的典型操作
Flux.just("Hello", "World")
.map(String::toUpperCase) // 转换
.filter(s -> s.length() > 3) // 过滤
.buffer(2) // 缓冲:[[HELLO], [WORLD]]
.flatMap(Flux::fromIterable) // 扁平化
.take(5) // 只取前5个
.subscribe(System.out::println); // 订阅并消费在AI流式聊天场景中,Flux 是核心类型。模型的响应被拆分为多个token,每个token作为一个元素在 Flux 中流动:
java
// 流式聊天 - 返回Flux
public Flux<String> chatStream(ChatRequest request) {
return webClient.post()
.uri("/v1/chat/completions")
.bodyValue(request)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.map(this::extractContent) // 提取content字段
.filter(content -> !content.isEmpty()); // 过滤空内容
}Mono与Flux的转换
在实际开发中,经常需要在 Mono 和 Flux 之间进行转换:
java
// Mono -> Flux
Mono.just("Hello").flux(); // Flux包含一个元素
Mono.just(list).flatMapMany(Flux::fromIterable); // 展开列表
// Flux -> Mono
Flux.just("A", "B", "C").collectList(); // Mono<List<String>>
Flux.just("A", "B", "C").next(); // Mono<String>,取第一个
Flux.just("A", "B", "C").last(); // Mono<String>,取最后一个
Flux.just("A", "B", "C").count(); // Mono<Long>,计数
Flux.just("A", "B", "C").reduce((a, b) -> a + b); // Mono<String>,聚合在smart-scaffold项目中,chatStream 方法返回 Flux<String>,而在流结束时,我们使用 concatWith 追加一个 Mono 来发送完成事件:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
return webClient.post()
.uri("/api/ai/chat/stream")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM)
.body(BodyInserters.fromValue(requestBody))
.retrieve()
.bodyToFlux(String.class)
.map(content -> {
// 解析JSON,提取content
// ...
return ServerSentEvent.<String>builder()
.event("message")
.data(jsonString)
.build();
})
// Flux + Mono 的拼接:在流结束后追加一个done事件
.concatWith(Mono.just(
ServerSentEvent.<String>builder()
.event("done")
.data("{\"type\":\"done\"}")
.build()
));常用操作符详解
以下是AI流式聊天场景中最常用的Reactor操作符:
| 操作符 | 作用 | 示例 |
|---|---|---|
map | 同步转换每个元素 | .map(s -> s.toUpperCase()) |
flatMap | 异步转换每个元素 | .flatMap(s -> fetchDetail(s)) |
filter | 过滤元素 | .filter(s -> !s.isEmpty()) |
concatWith | 拼接另一个流 | .concatWith(Mono.just("done")) |
onErrorResume | 错误恢复 | .onErrorResume(e -> fallback()) |
timeout | 超时控制 | .timeout(Duration.ofSeconds(60)) |
doOnError | 错误日志 | .doOnError(e -> log.error(e)) |
doOnNext | 每个元素的处理钩子 | .doOnNext(s -> log.info(s)) |
take | 只取前N个元素 | .take(1000) |
buffer | 将元素分组 | .buffer(10) |
2.4 WebClient替代RestTemplate
Spring WebFlux提供了 WebClient 作为 RestTemplate 的替代品。WebClient 是一个非阻塞的HTTP客户端,完全基于Reactor构建,天然支持响应式流。
为什么需要替代RestTemplate
RestTemplate 是Spring框架中经典的HTTP客户端,它使用同步阻塞的I/O模型。在调用远程AI服务时,RestTemplate 会阻塞当前线程直到响应返回。对于流式响应场景,RestTemplate 完全无法胜任——它只能等待整个响应体接收完毕后才能处理,无法实现逐块推送。
WebClient 的优势在于:
- 非阻塞I/O: 基于Netty的Event Loop模型,少量线程即可处理大量并发连接。
- 流式支持: 原生支持
Flux<DataBuffer>和Flux<String>流式响应。 - 函数式API: 链式调用风格,代码更简洁、更易读。
- 响应式集成: 与Reactor无缝集成,支持背压、超时、重试等响应式特性。
WebClient基本用法
java
// 创建WebClient实例
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:11434")
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer xxx")
.defaultHeader("X-Accel-Buffering", "no")
.build();
// 发送GET请求
Mono<String> response = webClient.get()
.uri("/api/tags")
.retrieve()
.bodyToMono(String.class);
// 发送POST请求(非流式)
Mono<String> response = webClient.post()
.uri("/v1/chat/completions")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(requestBody)
.retrieve()
.bodyToMono(String.class);
// 发送POST请求(流式)
Flux<String> responseFlux = webClient.post()
.uri("/v1/chat/completions")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM)
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(String.class);WebClient配置最佳实践
在smart-scaffold项目中,WebClient 的配置遵循以下最佳实践:
java
// 教学简化版 - WebClient配置
@Configuration
class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.responseTimeout(Duration.ofSeconds(300)) // 5分钟响应超时
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) // 10秒连接超时
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(300))
.addHandlerLast(new WriteTimeoutHandler(60))
)
));
}
}这个配置在consumer模块中已经存在:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
@Configuration
class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}实际项目中可以根据需要扩展这个基础配置,添加超时、连接池等高级配置。
2.5 背压(Backpressure)机制
背压是响应式编程中最重要也是最容易被忽视的概念。它解决了一个核心问题:当数据生产者(如AI模型服务)产生数据的速度超过数据消费者(如客户端网络)的处理能力时,如何避免系统崩溃?
背压的工作原理
在Reactive Streams规范中,背压通过 Subscription.request(n) 方法实现。消费者告诉生产者"我最多能处理n个元素",生产者发送n个元素后暂停,等待消费者的下一次请求。
消费者 生产者
| |
|--- request(10) ----------------->| "我可以处理10个元素"
| |
|<-- element 1 ---------------------|
|<-- element 2 ---------------------|
|<-- ... --------------------------|
|<-- element 10 --------------------|
| | 暂停发送,等待请求
| |
|--- request(5) ------------------>| "我再处理5个"
| |
|<-- element 11 --------------------|
|<-- ... --------------------------|
|<-- element 15 --------------------|WebClient中的背压
当使用 WebClient 接收SSE流式响应时,背压机制自动工作。bodyToFlux(String.class) 返回的 Flux 会根据下游消费者的处理速度自动调整从网络缓冲区读取数据的速度。
java
// WebClient自动处理背压
webClient.post()
.uri("/api/chat/stream")
.retrieve()
.bodyToFlux(String.class) // 内部使用request(n)控制读取速度
.map(this::parseContent)
.subscribe(content -> {
// 每处理完一个元素,Reactor自动请求下一个
appendToUI(content);
});背压策略
Reactor提供了几种内置的背压策略,可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等操作符来配置:
java
// 缓冲策略:将无法处理的数据缓存起来
flux.onBackpressureBuffer(1000); // 最多缓存1000个元素
// 丢弃策略:丢弃无法处理的数据
flux.onBackpressureDrop(); // 丢弃最新的数据
// 保留最新策略:只保留最新的数据
flux.onBackpressureLatest(); // 丢弃旧数据,保留最新
// 错误策略:当缓冲区满时抛出异常
flux.onBackpressureError();在AI流式聊天场景中,通常使用缓冲策略。因为每个token都是模型生成的完整语义单元,丢弃任何一个都会导致回复内容不完整。但需要注意设置合理的缓冲区大小,避免内存溢出。
实际场景中的背压考量
在smart-scaffold项目的实际部署中,背压主要在以下场景中发挥作用:
- 高并发场景: 当大量用户同时请求AI聊天时,服务器需要限制同时处理的流式连接数量,避免后端AI服务过载。
- 慢速客户端: 当客户端网络较慢时,服务器需要减慢数据推送速度,避免在服务器内存中积累过多未发送的数据。
- 大模型生成速度差异: 不同模型的生成速度差异很大(从每秒几个token到每秒上百个token),背压机制确保系统能够自适应不同的生成速度。
三、WebClient发送流式请求
3.1 WebClient配置与初始化
在smart-scaffold项目中,WebClient的配置分为两个层次:全局配置和请求级配置。
全局配置
全局配置在应用启动时创建,适用于所有通过该WebClient发送的请求:
java
// 教学简化版 - WebClient全局配置
@Configuration
public class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024) // 16MB缓冲区
);
}
}在consumer模块中,这个配置已经存在:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
@Configuration
class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}请求级配置
在每次发送请求时,根据目标服务的特性进行定制化配置:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java(教学简化版)
public Flux<String> chatStream(Long userId, String message, Boolean isCustom) {
ModelPO config = modelService.createModelService(userId, isCustom);
// 请求级WebClient配置
WebClient.Builder webClientBuilder = WebClient.builder()
.baseUrl(config.getBaseUrl());
// 根据配置添加认证头
if (config.getApiKey() != null && !config.getApiKey().isEmpty()) {
webClientBuilder.defaultHeader(
HttpHeaders.AUTHORIZATION,
"Bearer " + config.getApiKey()
);
}
// 添加SSE相关的头信息
webClientBuilder.defaultHeader("X-Accel-Buffering", "no");
webClientBuilder.defaultHeader("Cache-Control", "no-cache");
webClientBuilder.defaultHeader("Connection", "keep-alive");
WebClient webClient = webClientBuilder.build();
// ...
}这里有几个关键的配置细节值得深入分析:
X-Accel-Buffering: no - 这个头信息用于禁用Nginx的响应缓冲。在生产环境中,SSE响应通常会经过Nginx反向代理。Nginx默认会对响应进行缓冲,等待缓冲区满或响应结束后才发送给客户端。对于SSE流式响应,这种行为会导致数据延迟甚至连接超时。设置 X-Accel-Buffering: no 后,Nginx会立即将收到的数据转发给客户端。
Cache-Control: no-cache - 禁止任何层级的缓存。SSE是实时数据流,任何缓存都会导致数据不一致。
Connection: keep-alive - 显式声明保持连接。虽然在HTTP/1.1中这是默认行为,但显式声明可以确保中间代理正确处理。
3.2 发送POST请求到Ollama/OpenAI
在AI流式聊天场景中,发送POST请求到模型服务是整个流程的起点。smart-scaffold项目需要支持多种模型服务,包括本地部署的Ollama、OpenAI官方API以及兼容OpenAI协议的第三方服务。
请求体构建
不同模型服务的请求体格式不同,ChatClientFactory 中的 buildRequestBody 方法负责根据API类型构建对应的请求体:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
private Map<String, Object> buildRequestBody(
ModelPO config, String model, String message, boolean stream) {
if (model == null) {
model = "gpt-3.5-turbo";
}
if (message == null) {
message = "";
}
// 根据API类型构建不同的请求体
if ("OLLAMA".equals(config.getApiType())) {
// OLLAMA API格式
return Map.of(
"model", model,
"prompt", message,
"stream", stream,
"options", Map.of(
"temperature",
config.getTemperature() != null
? config.getTemperature().doubleValue() : 0.7,
"max_tokens",
config.getMaxTokens() != null ? config.getMaxTokens() : 2000
)
);
} else {
// OpenAI兼容API格式
return Map.of(
"model", model,
"messages", List.of(
Map.of("role", "system",
"content", "你是一个智能助手,需要准确回答用户的问题。"),
Map.of("role", "user", "content", message)
),
"temperature",
config.getTemperature() != null
? config.getTemperature().doubleValue() : 0.7,
"max_tokens",
config.getMaxTokens() != null ? config.getMaxTokens() : 2000,
"stream", stream
);
}
}两种请求体格式的关键差异:
| 字段 | Ollama格式 | OpenAI格式 |
|---|---|---|
| 用户输入 | prompt | messages (数组) |
| 系统提示 | 无独立字段 | messages 中的 system 角色 |
| 模型参数 | options.temperature | temperature (顶层) |
| 流式控制 | stream: true | stream: true |
完整的请求发送流程
java
// 教学简化版 - 完整的流式请求发送
public Flux<String> chatStream(Long userId, String message, Boolean isCustom) {
// 1. 获取模型配置
ModelPO config = modelService.createModelService(userId, isCustom);
// 2. 构建WebClient
WebClient webClient = buildWebClient(config);
// 3. 构建请求体
Map<String, Object> requestBody = buildRequestBody(
config, config.getModelName(), message, true);
// 4. 发送流式请求
return webClient.post()
.uri(getApiPath(config.getApiType(), true))
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM)
.body(BodyInserters.fromValue(requestBody))
.retrieve()
.bodyToFlux(String.class)
.map(this::extractContent)
.filter(content -> content != null && !content.isEmpty());
}3.3 设置Stream参数
流式请求的核心是设置 stream: true 参数。这个参数告诉模型服务:不要等待生成完整响应后再返回,而是每生成一个token就立即发送。
Stream参数的作用
当 stream: true 时:
- Ollama: 每生成一个token,立即通过NDJSON格式发送一个JSON对象,包含
response字段和done标志。 - OpenAI: 每生成一个token,立即通过SSE格式发送一个事件,包含
choices[0].delta.content字段,最后发送data: [DONE]标记结束。
当 stream: false 时(默认值):
- 模型服务会等待生成完整响应后,一次性返回所有内容。
- 适用于不需要实时展示的场景,如后台批量处理。
在请求体中设置Stream参数
在smart-scaffold项目中,stream 参数通过 buildRequestBody 方法的最后一个参数传入:
java
// 流式请求
Map<String, Object> streamBody = buildRequestBody(config, model, message, true);
// 请求体中包含 "stream": true
// 非流式请求
Map<String, Object> normalBody = buildRequestBody(config, model, message, false);
// 请求体中包含 "stream": falseAccept头设置
除了在请求体中设置 stream: true,还需要在请求头中设置 Accept: text/event-stream,告诉服务器客户端期望接收SSE格式的响应:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
return webClient.post()
.uri(apiPath)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM) // 关键:声明接受SSE流
.body(BodyInserters.fromValue(requestBody))
.retrieve()
.bodyToFlux(String.class); // 以Flux形式接收响应accept(MediaType.TEXT_EVENT_STREAM) 和 bodyToFlux(String.class) 的组合是接收SSE流式响应的关键。前者告诉服务器客户端期望SSE格式,后者告诉WebClient以流式方式处理响应体。
3.4 接收Flux流式响应
当WebClient收到SSE响应后,通过 bodyToFlux(String.class) 将响应体转换为 Flux<String>。每个SSE事件的数据部分会被解析为一个String元素。
响应数据的处理管道
在smart-scaffold项目中,响应数据的处理遵循以下管道模式:
模型服务响应
|
v
bodyToFlux(String.class) -- 将SSE流解析为Flux<String>
|
v
map(line -> ...) -- 移除data:前缀,提取JSON
|
v
filter(line -> ...) -- 过滤空行和无效数据
|
v
map(json -> parseContent) -- 解析JSON,提取content字段
|
v
filter(content -> ...) -- 过滤空content
|
v
输出到前端具体实现
在springboot版本的 ChatClientFactory 中,数据处理管道的实现如下:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
return webClient.post()
.uri(apiPath)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM)
.body(BodyInserters.fromValue(requestBody))
.retrieve()
.bodyToFlux(String.class)
.map(line -> {
if (!line.trim().isEmpty()) {
// 提取JSON数据,移除所有可能的data:前缀
String jsonStr = line;
while (jsonStr.startsWith("data:")) {
jsonStr = jsonStr.substring("data:".length()).trim();
}
return jsonStr;
} else {
return null;
}
})
.filter(line -> line != null)
.filter(line -> {
// 过滤掉空响应
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> jsonMap = mapper.readValue(line, Map.class);
if (jsonMap.containsKey("response")) {
String response = (String) jsonMap.get("response");
return response != null && !response.isEmpty();
}
return true;
} catch (Exception e) {
return true;
}
});这段代码的处理逻辑值得仔细分析:
移除data:前缀: SSE规范中,每个事件的数据以
data:开头。WebClient在解析SSE时,可能将data:前缀保留在字符串中,因此需要手动移除。使用while循环确保移除所有可能的data:前缀(某些情况下可能存在多层前缀)。过滤空行: SSE事件之间用空行分隔,
bodyToFlux可能将空行作为元素发出,需要过滤掉。过滤空响应: Ollama在流结束时可能发送
{"response":""}这样的空响应,需要过滤掉以避免前端显示空白。
3.5 超时配置与连接管理
在流式聊天场景中,超时配置需要特别小心。大模型生成一个完整回复可能需要几十秒甚至几分钟,如果超时时间设置过短,会导致响应被截断。
多层超时配置
客户端浏览器
|
v (1) 客户端超时
Nginx反向代理
|
v (2) 代理超时
Spring WebFlux应用
|
v (3) WebClient读取超时
模型服务(Ollama/OpenAI)
|
v (4) 模型生成超时每一层都需要配置合理的超时时间,且上游的超时时间应该大于下游。
WebClient超时配置
java
// 教学简化版 - WebClient超时配置
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.time.Duration;
// 连接池配置
ConnectionProvider provider = ConnectionProvider.builder("ai-service")
.maxConnections(50) // 最大连接数
.maxIdleTime(Duration.ofSeconds(60)) // 最大空闲时间
.maxLifeTime(Duration.ofSeconds(300)) // 最大生命周期
.pendingAcquireTimeout(Duration.ofSeconds(30)) // 获取连接超时
.build();
// HTTP客户端配置
HttpClient httpClient = HttpClient.create(provider)
.responseTimeout(Duration.ofSeconds(300)) // 响应超时:5分钟
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) // 连接超时:10秒
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(300)) // 读取超时:5分钟
.addHandlerLast(new WriteTimeoutHandler(60)) // 写入超时:1分钟
);
// 创建WebClient
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();Reactor层面的超时控制
除了底层的HTTP超时,还可以在Reactor层面添加超时控制:
java
webClient.post()
.uri(apiPath)
.retrieve()
.bodyToFlux(String.class)
.timeout(Duration.ofSeconds(300)) // 整体超时
.fluxTimeout(Duration.ofSeconds(30), item -> {
// 每个元素的超时:如果30秒内没有收到新元素,则超时
log.warn("等待下一个token超时");
return Flux.error(new TimeoutException("模型生成超时"));
});timeout(Duration) 设置整个流的超时时间,而 fluxTimeout 设置相邻两个元素之间的超时时间。在AI流式聊天中,fluxTimeout 更有意义——如果模型在30秒内没有生成新的token,很可能出了问题,应该及时通知用户而不是无限等待。
四、Ollama流式响应解析
4.1 Ollama /api/generate 接口详解
Ollama是目前最流行的本地大模型运行框架之一,它提供了RESTful API来管理模型和进行推理。在smart-scaffold项目中,我们使用Ollama的 /api/generate 接口进行流式聊天。
接口规格
POST /api/generate
Content-Type: application/json请求体:
json
{
"model": "qwen2.5:7b-instruct-q4_k_m",
"prompt": "你好,请介绍一下自己",
"stream": true,
"options": {
"temperature": 0.7,
"max_tokens": 2000
}
}非流式响应(stream: false):
json
{
"model": "qwen2.5:7b-instruct-q4_k_m",
"response": "你好!我是一个AI助手...",
"done": true,
"context": [1, 2, 3, ...],
"total_duration": 5432000000,
"load_duration": 1234000000,
"prompt_eval_count": 15,
"prompt_eval_duration": 543000000,
"eval_count": 256,
"eval_duration": 3876000000
}流式响应(stream: true):
Ollama的流式响应使用NDJSON(Newline Delimited JSON)格式,每个JSON对象占一行:
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"你","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"好","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"!","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"\n","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"我","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"是","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"AI","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"助手","done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","response":"","done":true,"context":[1,2,3,...],"total_duration":5432000000}Ollama /api/chat 接口
Ollama还提供了 /api/chat 接口,支持多轮对话。其请求体格式与OpenAI类似:
json
{
"model": "qwen2.5:7b-instruct-q4_k_m",
"messages": [
{"role": "system", "content": "你是一个智能助手"},
{"role": "user", "content": "你好"}
],
"stream": true
}流式响应格式使用 message.content 字段:
{"model":"qwen2.5:7b-instruct-q4_k_m","message":{"role":"assistant","content":"你"},"done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","message":{"role":"assistant","content":"好"},"done":false}
{"model":"qwen2.5:7b-instruct-q4_k_m","message":{"role":"assistant","content":"!"},"done":true}在smart-scaffold项目中,统一使用 /api/generate 接口,以确保响应格式的一致性。这在 getApiPath 方法中明确体现:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
private String getApiPath(String apiType, boolean stream) {
if ("OLLAMA".equals(apiType)) {
// 统一使用/api/generate端点,确保响应格式一致
return "/api/generate";
} else {
// 对于其他类型,使用OpenAI兼容接口
return "/v1/chat/completions";
}
}4.2 response格式解析
Ollama /api/generate 接口的流式响应中,每个JSON对象包含以下关键字段:
json
{
"model": "qwen2.5:7b-instruct-q4_k_m",
"response": "你",
"done": false
}| 字段 | 类型 | 说明 |
|---|---|---|
model | string | 使用的模型名称 |
response | string | 本次生成的文本片段 |
done | boolean | 是否生成完毕 |
context | number[] | 上下文向量(仅done为true时存在) |
total_duration | number | 总耗时(仅done为true时存在) |
eval_count | number | 生成的token数量(仅done为true时存在) |
在流式解析中,我们主要关注两个字段:
response: 包含本次生成的文本片段,可能是一个字、一个词或一个标点符号。done: 标识生成是否完成。当done为true时,表示模型已经完成了全部生成。
4.3 NDJSON解析策略
NDJSON(Newline Delimited JSON)是一种简单的数据格式,每行是一个独立的JSON对象,行之间以换行符(\n)分隔。Ollama的流式响应使用NDJSON格式。
NDJSON vs SSE
NDJSON和SSE都是流式数据格式,但有关键区别:
| 特性 | NDJSON | SSE |
|---|---|---|
| 分隔符 | 换行符 \n | 空行 \n\n |
| 字段前缀 | 无 | data: |
| 事件类型 | 无 | event: |
| 自动重连 | 无 | 有 |
| 浏览器API | 无原生支持 | EventSource |
Ollama的 /api/generate 接口返回的是NDJSON格式,但WebClient在设置 accept(MediaType.TEXT_EVENT_STREAM) 后,会自动将NDJSON行解析为SSE事件的data部分。
解析实现
在smart-scaffold项目中,NDJSON的解析通过WebClient的 bodyToFlux(String.class) 自动完成。WebClient会将响应体按行分割,每行作为一个String元素发出。然后通过 map 操作符提取JSON数据:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
.bodyToFlux(String.class)
.map(line -> {
if (!line.trim().isEmpty()) {
// 提取JSON数据,移除所有可能的data:前缀
String jsonStr = line;
while (jsonStr.startsWith("data:")) {
jsonStr = jsonStr.substring("data:".length()).trim();
}
return jsonStr;
} else {
return null;
}
})
.filter(line -> line != null)4.4 逐块提取content字段
从NDJSON行中提取 response 字段是Ollama流式解析的核心步骤。在consumer模块的AIController中,这一步通过JSON解析实现:
java
// 来自 smart-scaffold-dubbo consumer AIController.java(教学简化版)
Map<String, Object> json = mapper.readValue(cleanContent, Map.class);
if (json.containsKey("response")) {
chunk = (String) json.get("response");
}解析流程图
原始行: "data: {\"model\":\"qwen2.5\",\"response\":\"你\",\"done\":false}"
|
v 移除data:前缀
"{\"model\":\"qwen2.5\",\"response\":\"你\",\"done\":false}"
|
v JSON解析
Map {model: "qwen2.5", response: "你", done: false}
|
v 提取response字段
"你"
|
v 构建SSE事件
ServerSentEvent {event: "message", data: "{\"type\":\"chunk\",\"content\":\"你\"}"}边界情况处理
在实际解析中,需要注意以下边界情况:
- 空response: Ollama在流结束时可能发送
{"response":"","done":true},需要过滤掉空字符串。 - 换行符: response中可能包含
\n换行符,在构建SSE事件的data字段时需要正确转义。 - 特殊字符: response中可能包含引号、反斜杠等JSON特殊字符,需要正确转义。
- 多行response: 虽然罕见,但某些情况下response可能包含多行文本。
smart-scaffold项目中的 escapeJson 方法处理了特殊字符的转义:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
private String escapeJson(String content) {
if (content == null) {
return "";
}
return content.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}4.5 done标志判断结束
Ollama在流式响应的最后一条消息中设置 done: true,标志生成完成。在smart-scaffold项目中,done标志的处理方式取决于具体版本:
springboot版本: 通过过滤空response来间接处理done标志(done为true时response通常为空字符串):
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
.filter(line -> {
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> jsonMap = mapper.readValue(line, Map.class);
if (jsonMap.containsKey("response")) {
String response = (String) jsonMap.get("response");
return response != null && !response.isEmpty();
}
return true;
} catch (Exception e) {
return true;
}
});dubbo consumer版本: 通过 concatWith 在流结束后追加一个显式的done事件:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
.concatWith(Mono.just(
ServerSentEvent.<String>builder()
.event("done")
.data("{\"type\":\"done\"}")
.build()
));这种设计确保前端能够明确知道流已经结束,可以停止等待并执行后续操作(如显示"生成完毕"提示、启用输入框等)。
五、OpenAI流式响应解析
5.1 OpenAI /v1/chat/completions 接口
OpenAI的Chat Completions API是目前最广泛使用的大模型API接口,其流式响应遵循标准的SSE格式。
接口规格
POST /v1/chat/completions
Content-Type: application/json
Authorization: Bearer sk-xxx请求体:
json
{
"model": "gpt-4o",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
],
"temperature": 0.7,
"max_tokens": 2000,
"stream": true
}流式响应:
OpenAI使用标准的SSE格式,每个事件以 data: 开头,事件之间以空行分隔:
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]5.2 choices[0].delta.content格式
OpenAI流式响应的核心数据结构是 choices[0].delta,它包含本次增量生成的内容。
delta对象的结构
在流式响应中,delta 对象有以下几种形态:
第一条消息(角色声明):
json
{
"choices": [{
"index": 0,
"delta": {
"role": "assistant"
},
"finish_reason": null
}]
}内容生成中:
json
{
"choices": [{
"index": 0,
"delta": {
"content": "Hello"
},
"finish_reason": null
}]
}生成结束:
json
{
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}解析实现
在smart-scaffold项目的consumer模块中,OpenAI格式的解析逻辑如下:
java
// 来自 smart-scaffold-dubbo consumer AIController.java(教学简化版)
Map<String, Object> json = mapper.readValue(cleanContent, Map.class);
if (json.containsKey("response")) {
// Ollama格式
chunk = (String) json.get("response");
} else if (json.containsKey("choices")) {
// OpenAI格式
List<Map<String, Object>> choices =
(List<Map<String, Object>>) json.get("choices");
if (!choices.isEmpty()) {
Map<String, Object> choice = choices.get(0);
if (choice.containsKey("delta")) {
Map<String, Object> delta =
(Map<String, Object>) choice.get("delta");
if (delta.containsKey("content")) {
chunk = (String) delta.get("content");
}
}
}
}解析路径为:json.choices[0].delta.content。需要注意 delta 可能为空对象(在结束信号中),因此需要先检查 delta.containsKey("content")。
5.3 SSE标准格式与data: [DONE]
OpenAI严格遵循SSE标准格式,有几个重要特征:
data: 前缀
每个事件的数据都以 data: 开头,后面跟一个空格,然后是JSON字符串:
data: {"id":"chatcmpl-abc123",...}WebClient在接收到这种格式时,bodyToFlux(String.class) 会将 data: 前缀保留在字符串中,需要手动移除:
java
// 移除data:前缀
String cleanContent = content;
while (cleanContent.startsWith("data:")) {
cleanContent = cleanContent.substring("data:".length()).trim();
}data: [DONE] 结束标记
OpenAI使用 data: [DONE] 作为流结束标记。这是一个特殊的SSE事件,data部分是字符串 [DONE] 而不是JSON对象。
data: [DONE]在解析时需要特别处理这个标记:
java
// 教学简化版 - 处理[DONE]标记
if ("[DONE]".equals(cleanContent)) {
// 流结束,不再处理后续数据
return null;
}finish_reason字段
OpenAI在最后一个数据事件中通过 finish_reason 字段指示生成结束的原因:
| finish_reason | 含义 |
|---|---|
stop | 模型正常结束生成 |
length | 达到max_tokens限制 |
content_filter | 内容被安全过滤器拦截 |
null | 仍在生成中 |
在AI聊天应用中,可以根据 finish_reason 向用户展示不同的提示信息:
java
// 教学简化版
String finishReason = (String) choice.get("finish_reason");
if ("stop".equals(finishReason)) {
// 正常结束
appendDoneEvent();
} else if ("length".equals(finishReason)) {
// Token超限
appendWarningEvent("回复被截断,请缩短问题或增加max_tokens");
} else if ("content_filter".equals(finishReason)) {
// 内容过滤
appendWarningEvent("回复被安全过滤器拦截");
}5.4 与Ollama格式的差异对比
以下是Ollama和OpenAI流式响应的全面对比:
| 维度 | Ollama | OpenAI |
|---|---|---|
| 接口路径 | /api/generate | /v1/chat/completions |
| 数据格式 | NDJSON | SSE(data: 前缀) |
| 内容字段 | response | choices[0].delta.content |
| 结束标志 | done: true | data: [DONE] + finish_reason |
| 角色信息 | 无 | delta.role |
| 请求体用户输入 | prompt | messages 数组 |
| 请求体模型参数 | options.temperature | temperature(顶层) |
| 上下文支持 | /api/chat 接口 | 原生多轮对话 |
| 认证方式 | 通常无 | Bearer Token |
这些差异是设计双协议适配器的核心驱动力。在smart-scaffold项目中,ChatClientFactory 通过 getApiPath 和 buildRequestBody 两个方法来屏蔽这些差异,对外提供统一的接口。
六、双协议适配器设计
6.1 ChatClientFactory中的协议判断
在smart-scaffold项目中,ChatClientFactory 是AI聊天功能的核心组件,它负责根据配置自动选择合适的协议与模型服务通信。这种设计使得上层业务代码不需要关心底层使用的是Ollama还是OpenAI。
协议判断逻辑
协议判断基于 ModelPO(或 ModelConfig)对象中的 apiType 字段:
java
// 来自 smart-scaffold-springboot ChatClientFactory.java
private String getApiPath(String apiType, boolean stream) {
if ("OLLAMA".equals(apiType)) {
return "/api/generate";
} else {
// OPENAI、COMPATIBLE_OPENAI等都使用OpenAI兼容接口
return "/v1/chat/completions";
}
}支持的API类型包括:
| apiType | 说明 | 接口路径 | 请求格式 |
|---|---|---|---|
OLLAMA | 本地Ollama服务 | /api/generate | Ollama格式 |
OPENAI | OpenAI官方API | /v1/chat/completions | OpenAI格式 |
COMPATIBLE_OPENAI | 兼容OpenAI的第三方服务 | /v1/chat/completions | OpenAI格式 |
配置来源
API类型通过 application.yml 配置:
yaml
# application.yml - 教学简化版
bima:
ai:
default-api-type: OLLAMA # 默认使用Ollama
# default-api-type: OPENAI # 切换到OpenAI
# default-api-type: COMPATIBLE_OPENAI # 切换到兼容OpenAI服务在 ModelService 中,根据配置创建对应的模型配置对象:
java
// 来自 smart-scaffold-springboot ModelService.java
private ModelPO getSystemModel() {
ModelPO modelPO = new ModelPO();
switch (defaultApiType) {
case "OPENAI":
modelPO.setApiType("OPENAI");
modelPO.setBaseUrl(openaiBaseUrl);
modelPO.setModelName(openaiModelName);
// ...
break;
case "COMPATIBLE_OPENAI":
modelPO.setApiType("COMPATIBLE_OPENAI");
modelPO.setBaseUrl(compatibleOpenaiBaseUrl);
modelPO.setModelName(compatibleOpenaiModelName);
// ...
break;
case "OLLAMA":
default:
modelPO.setApiType("OLLAMA");
modelPO.setBaseUrl(ollamaModelUrl);
modelPO.setModelName(ollamaModelName);
// ...
break;
}
return modelPO;
}6.2 根据api_type选择解析策略
在consumer模块的AIController中,需要根据响应格式选择不同的解析策略。由于WebClient接收到的原始数据可能是Ollama格式也可能是OpenAI格式,解析器需要能够自动识别:
java
// 来自 smart-scaffold-dubbo consumer AIController.java(教学简化版)
.map(content -> {
String chunk = "";
try {
ObjectMapper mapper = new ObjectMapper();
// 移除可能的data:前缀
String cleanContent = content;
while (cleanContent.startsWith("data:")) {
cleanContent = cleanContent.substring("data:".length()).trim();
}
Map<String, Object> json = mapper.readValue(cleanContent, Map.class);
// 策略1:尝试解析为OLLAMA格式
if (json.containsKey("response")) {
chunk = (String) json.get("response");
}
// 策略2:尝试解析为OpenAI格式
else if (json.containsKey("choices")) {
List<Map<String, Object>> choices =
(List<Map<String, Object>>) json.get("choices");
if (!choices.isEmpty()) {
Map<String, Object> choice = choices.get(0);
if (choice.containsKey("delta")) {
Map<String, Object> delta =
(Map<String, Object>) choice.get("delta");
if (delta.containsKey("content")) {
chunk = (String) delta.get("content");
}
}
}
}
} catch (Exception e) {
// 解析失败,直接使用原始内容
chunk = content;
}
return chunk;
})这种"先尝试Ollama格式,再尝试OpenAI格式"的策略简单有效,因为两种格式的JSON结构有明显的区分特征:
- Ollama格式:顶层包含
response字段 - OpenAI格式:顶层包含
choices字段
通过检查这两个字段的存在性,就可以可靠地判断响应格式。
6.3 统一的Flux输出
无论底层使用哪种协议,ChatClientFactory.chatStream() 方法对外统一返回 Flux<String>,每个String元素代表一个文本片段(token)。
接口定义
java
// 来自 smart-scaffold-dubbo IChatClientFactory.java
public interface IChatClientFactory {
String chat(Long userId, String message, Boolean isCustom);
Flux<String> chatStream(Long userId, String message, Boolean isCustom);
}统一输出的实现
在consumer模块中,统一的 Flux<String> 被转换为统一的 Flux<ServerSentEvent<String>>,前端只需要处理一种格式:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
return webClient.post()
.uri("/api/ai/chat/stream")
// ...
.bodyToFlux(String.class)
.map(content -> {
// 统一解析:自动识别Ollama或OpenAI格式
String chunk = parseContent(content);
// 统一输出:构建标准SSE事件
return ServerSentEvent.<String>builder()
.event("message")
.data("{\"type\":\"chunk\",\"content\":\"" + escapeJson(chunk) + "\"}")
.build();
})
// 统一结束:追加done事件
.concatWith(Mono.just(
ServerSentEvent.<String>builder()
.event("done")
.data("{\"type\":\"done\"}")
.build()
));前端收到的数据格式始终是:
json
{"type":"chunk","content":"文本片段"}或:
json
{"type":"done"}6.4 错误处理统一
双协议适配器还需要统一错误处理。无论是Ollama服务不可用还是OpenAI API Key过期,都应该返回统一格式的错误信息。
错误处理实现
java
// 来自 smart-scaffold-dubbo consumer AIController.java
.onErrorResume(e -> {
log.error("AI流式聊天失败: {}", e.getMessage(), e);
String errorJson = "{\"type\":\"error\",\"error\":\""
+ e.getMessage().replace("\"", "\\\"") + "\"}";
return Mono.just(
ServerSentEvent.<String>builder()
.event("error")
.data(errorJson)
.build()
);
});错误类型分类
| 错误类型 | 可能原因 | 建议处理方式 |
|---|---|---|
ConnectException | 模型服务未启动 | 提示"模型服务不可用" |
TimeoutException | 模型生成超时 | 提示"生成超时,请重试" |
WebClientResponseException | API Key无效或配额用尽 | 提示"认证失败" |
JsonProcessingException | 响应格式异常 | 记录日志,返回原始内容 |
七、SSE事件构建
7.1 ServerSentEvent.builder()详解
Spring Framework提供了 ServerSentEvent<T> 类来构建SSE事件。它使用Builder模式,支持链式调用,API设计非常优雅。
基本用法
java
ServerSentEvent<String> event = ServerSentEvent.<String>builder()
.event("message") // 设置事件类型
.data("Hello World") // 设置数据
.id("12345") // 设置事件ID
.retry(3000) // 设置重连间隔
.build();Builder方法说明
| 方法 | 对应SSE字段 | 说明 |
|---|---|---|
event(String) | event: | 事件类型,默认为"message" |
data(T) | data: | 事件数据,泛型T支持任意类型 |
id(String) | id: | 事件ID,用于断点续传 |
retry(Duration) | retry: | 重连间隔,单位为毫秒 |
comment(String) | : | 注释,不会发送给客户端 |
build() | - | 构建ServerSentEvent对象 |
7.2 event()、data()、id()、retry()方法
在smart-scaffold项目中,我们使用了 event() 和 data() 两个方法来构建SSE事件。
event()方法 - 事件类型
java
// 消息事件
ServerSentEvent.<String>builder()
.event("message")
.data("{\"type\":\"chunk\",\"content\":\"Hello\"}")
.build();
// 完成事件
ServerSentEvent.<String>builder()
.event("done")
.data("{\"type\":\"done\"}")
.build();
// 错误事件
ServerSentEvent.<String>builder()
.event("error")
.data("{\"type\":\"error\",\"error\":\"模型服务不可用\"}")
.build();使用不同的事件类型可以让前端精确地处理不同类型的事件,而不是在 onmessage 回调中通过解析data字段来区分。
data()方法 - 事件数据
data() 方法接受一个泛型参数,Spring会使用配置的 HttpMessageWriter 将其序列化为字符串。对于 String 类型,直接输出;对于对象类型,默认使用Jackson序列化为JSON。
java
// 直接使用字符串
.data("{\"type\":\"chunk\",\"content\":\"Hello\"}")
// 使用对象(Spring会自动序列化为JSON)
.data(new ChatChunk("chunk", "Hello"))在smart-scaffold项目中,我们使用字符串形式构建JSON,这样可以精确控制JSON的格式和转义。
id()方法 - 事件ID
java
AtomicLong eventCounter = new AtomicLong(0);
return flux.map(content ->
ServerSentEvent.<String>builder()
.event("message")
.data(content)
.id(String.valueOf(eventCounter.incrementAndGet()))
.build()
);retry()方法 - 重连间隔
java
ServerSentEvent.<String>builder()
.event("message")
.data(content)
.retry(Duration.ofSeconds(5)) // 断线后5秒重连
.build();7.3 MediaType.TEXT_EVENT_STREAM_VALUE
MediaType.TEXT_EVENT_STREAM_VALUE 是Spring框架中定义的常量,值为 "text/event-stream"。在Controller方法上设置这个produces值,Spring会自动:
- 设置响应头
Content-Type: text/event-stream - 设置响应头
Cache-Control: no-cache - 使用
ServerSentEventHttpMessageWriter写入响应 - 禁用响应缓冲
java
// 来自 smart-scaffold-dubbo consumer AIController.java
@PostMapping(value = "/chat/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
@RequestBody Map<String, Object> requestBody) {
// ...
}7.4 Controller返回Flux<ServerSentEvent<String>>
当Controller方法返回 Flux<ServerSentEvent<String>> 时,Spring WebFlux会自动将每个 ServerSentEvent 对象序列化为SSE格式并发送给客户端。
完整的Controller实现
java
// 来自 smart-scaffold-dubbo consumer AIController.java(教学简化版)
@PostMapping(value = "/chat/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
@RequestBody Map<String, Object> requestBody) {
Long userId = Long.valueOf(requestBody.get("userId").toString());
String message = requestBody.get("message").toString();
// 使用WebClient调用provider的流式接口
WebClient webClient = webClientBuilder
.baseUrl(getProviderUrl()).build();
return webClient.post()
.uri("/api/ai/chat/stream")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM)
.body(BodyInserters.fromValue(requestBody))
.retrieve()
.bodyToFlux(String.class)
// 解析响应,构建SSE事件
.map(content -> {
String chunk = parseContent(content);
return ServerSentEvent.<String>builder()
.event("message")
.data("{\"type\":\"chunk\",\"content\":\""
+ escapeJson(chunk) + "\"}")
.build();
})
// 流结束后追加done事件
.concatWith(Mono.just(
ServerSentEvent.<String>builder()
.event("done")
.data("{\"type\":\"done\"}")
.build()
))
// 错误处理
.onErrorResume(e -> {
String errorJson = "{\"type\":\"error\",\"error\":\""
+ e.getMessage().replace("\"", "\\\"") + "\"}";
return Mono.just(
ServerSentEvent.<String>builder()
.event("error")
.data(errorJson)
.build()
);
});
}这个Controller方法的响应式管道非常清晰:
- 从provider获取
Flux<String>原始数据流 - 通过
map解析每个数据块,构建ServerSentEvent<String> - 通过
concatWith在流结束后追加done事件 - 通过
onErrorResume统一处理异常
八、前端EventSource对接
8.1 new EventSource(url)创建连接
浏览器原生的 EventSource API 是对接SSE服务端最简单的方式。它提供了自动重连、事件解析等内置功能。
基本用法
javascript
// 创建EventSource连接
const eventSource = new EventSource('/ai/chat/stream');注意: EventSource 只支持GET请求,而smart-scaffold项目的流式聊天接口使用POST请求。因此,在实际项目中,我们使用 fetch API 配合 ReadableStream 来处理POST方式的SSE:
javascript
// 使用fetch发送POST请求并处理SSE响应
async function streamChat(userId, message) {
const response = await fetch('/ai/chat/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ userId, message })
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按SSE事件格式解析
const events = buffer.split('\n\n');
buffer = events.pop(); // 保留不完整的部分
for (const event of events) {
const lines = event.split('\n');
let eventType = 'message';
let data = '';
for (const line of lines) {
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
} else if (line.startsWith('data:')) {
data = line.substring(5).trim();
}
}
// 根据事件类型分发处理
handleEvent(eventType, data);
}
}
}使用EventSource的替代方案
如果服务端支持GET请求(例如通过URL参数传递消息),可以使用原生的 EventSource:
javascript
// GET方式的EventSource(需要服务端支持)
const eventSource = new EventSource(
'/ai/chat/stream?userId=1&message=' + encodeURIComponent('你好')
);
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk') {
appendContent(data.content);
}
});
eventSource.addEventListener('done', () => {
finishStreaming();
});
eventSource.addEventListener('error', (event) => {
console.error('SSE error:', event);
});8.2 onmessage回调处理
onmessage 是 EventSource 的默认事件处理器,当收到 event: message 类型的事件时触发。
javascript
eventSource.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
switch (data.type) {
case 'chunk':
// 收到文本片段,追加到显示区域
appendToDisplay(data.content);
break;
case 'done':
// 生成完成
onGenerationComplete();
break;
case 'error':
// 错误处理
onError(data.error);
break;
}
} catch (e) {
console.error('解析SSE数据失败:', e);
}
};使用addEventListener处理自定义事件
在smart-scaffold项目中,我们使用了自定义的事件类型(message、done、error),可以通过 addEventListener 分别处理:
javascript
// 处理消息事件
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
appendToDisplay(data.content);
});
// 处理完成事件
eventSource.addEventListener('done', (event) => {
onGenerationComplete();
eventSource.close(); // 主动关闭连接
});
// 处理错误事件
eventSource.addEventListener('error', (event) => {
if (event.data) {
const data = JSON.parse(event.data);
showError(data.error);
}
});8.3 onerror处理与自动重连
EventSource 的 onerror 回调在连接出错时触发,包括网络中断、服务器关闭连接等情况。
自动重连机制
浏览器原生的 EventSource 在连接断开后会自动尝试重连。重连行为如下:
- 连接断开后,等待
retry:字段指定的毫秒数(默认3000ms) - 发送新的HTTP请求,并携带
Last-Event-ID请求头 - 如果重连成功,继续接收事件
- 如果重连失败,继续等待并重试
javascript
eventSource.onerror = function(event) {
if (event.target.readyState === EventSource.CLOSED) {
// 连接被永久关闭(调用了eventSource.close())
console.log('SSE连接已关闭');
} else if (event.target.readyState === EventSource.CONNECTING) {
// 正在尝试重连
console.log('SSE连接断开,正在重连...');
showReconnectingIndicator();
}
};手动重连策略
在某些场景下,可能需要实现更精细的重连策略:
javascript
class SSEClient {
constructor(url) {
this.url = url;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.baseReconnectDelay = 1000;
this.eventSource = null;
}
connect() {
this.eventSource = new EventSource(this.url);
this.eventSource.onopen = () => {
this.reconnectAttempts = 0; // 重置重连计数
console.log('SSE连接已建立');
};
this.eventSource.onerror = () => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
// 指数退避重连
const delay = this.baseReconnectDelay
* Math.pow(2, this.reconnectAttempts);
this.reconnectAttempts++;
console.log(`${delay}ms后尝试第${this.reconnectAttempts}次重连`);
setTimeout(() => this.connect(), delay);
} else {
console.error('达到最大重连次数,停止重连');
this.eventSource.close();
}
};
this.eventSource.addEventListener('message', (event) => {
// 处理消息
});
}
close() {
if (this.eventSource) {
this.eventSource.close();
}
}
}8.4 流式UI渲染(打字机效果)
流式UI渲染是SSE技术最直观的用户体验体现。通过将每个token实时追加到显示区域,实现"打字机"般的视觉效果。
基础实现
html
<!DOCTYPE html>
<html>
<head>
<title>AI流式聊天</title>
<style>
.chat-container {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
.message {
padding: 12px 16px;
border-radius: 8px;
margin-bottom: 12px;
line-height: 1.6;
}
.message.user {
background-color: #007bff;
color: white;
text-align: right;
}
.message.assistant {
background-color: #f0f0f0;
color: #333;
}
.cursor {
display: inline-block;
width: 2px;
height: 1em;
background-color: #333;
animation: blink 1s infinite;
vertical-align: text-bottom;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
.input-area {
display: flex;
gap: 10px;
margin-top: 20px;
}
.input-area textarea {
flex: 1;
padding: 10px;
border: 1px solid #ccc;
border-radius: 4px;
resize: vertical;
}
.input-area button {
padding: 10px 20px;
background-color: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.input-area button:disabled {
background-color: #ccc;
cursor: not-allowed;
}
</style>
</head>
<body>
<div class="chat-container">
<div id="chatMessages"></div>
<div class="input-area">
<textarea id="userInput" placeholder="输入消息..." rows="3"></textarea>
<button id="sendBtn" onclick="sendMessage()">发送</button>
</div>
</div>
<script>
let isStreaming = false;
let currentMessageEl = null;
async function sendMessage() {
const input = document.getElementById('userInput');
const message = input.value.trim();
if (!message || isStreaming) return;
isStreaming = true;
document.getElementById('sendBtn').disabled = true;
input.value = '';
// 显示用户消息
appendMessage('user', message);
// 创建AI回复消息区域
currentMessageEl = appendMessage('assistant', '');
const cursor = document.createElement('span');
cursor.className = 'cursor';
currentMessageEl.appendChild(cursor);
try {
const response = await fetch('/ai/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ userId: 1, message: message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split('\n\n');
buffer = events.pop();
for (const event of events) {
const lines = event.split('\n');
let eventType = 'message';
let data = '';
for (const line of lines) {
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
} else if (line.startsWith('data:')) {
data = line.substring(5).trim();
}
}
if (eventType === 'message' && data) {
try {
const parsed = JSON.parse(data);
if (parsed.type === 'chunk' && parsed.content) {
// 追加文本到消息区域
currentMessageEl.insertBefore(
document.createTextNode(parsed.content),
cursor
);
}
} catch (e) {
console.error('解析失败:', e);
}
} else if (eventType === 'done') {
// 移除光标
cursor.remove();
} else if (eventType === 'error') {
cursor.remove();
const errorEl = document.createElement('span');
errorEl.style.color = 'red';
errorEl.textContent = ' [错误: ' + data + ']';
currentMessageEl.appendChild(errorEl);
}
}
}
} catch (error) {
console.error('流式请求失败:', error);
if (currentMessageEl) {
const cursor = currentMessageEl.querySelector('.cursor');
if (cursor) cursor.remove();
const errorEl = document.createElement('span');
errorEl.style.color = 'red';
errorEl.textContent = ' [请求失败: ' + error.message + ']';
currentMessageEl.appendChild(errorEl);
}
} finally {
isStreaming = false;
document.getElementById('sendBtn').disabled = false;
input.focus();
}
}
function appendMessage(role, content) {
const container = document.getElementById('chatMessages');
const div = document.createElement('div');
div.className = 'message ' + role;
div.textContent = content;
container.appendChild(div);
container.scrollTop = container.scrollHeight;
return div;
}
</script>
</body>
</html>Markdown渲染增强
在实际的AI聊天应用中,模型的回复通常包含Markdown格式(标题、列表、代码块等)。可以使用 marked.js 等库将Markdown渲染为HTML:
javascript
import { marked } from 'marked';
function appendMarkdownContent(element, content) {
// 使用marked渲染Markdown
element.innerHTML = marked.parse(accumulatedContent);
// 代码高亮
element.querySelectorAll('pre code').forEach(block => {
hljs.highlightElement(block);
});
}九、流式响应的异常处理
9.1 连接超时处理
连接超时是流式聊天中最常见的异常之一。大模型生成一个完整的回复可能需要几十秒,如果超时配置不当,会导致响应被截断。
超时异常的类型
连接超时 (ConnectTimeout)
|
v TCP连接建立失败
读取超时 (ReadTimeout)
|
v 等待数据超时
整体超时 (TimeoutException)
|
v 整个流超过最大时长
元素间隔超时 (fluxTimeout)
|
v 两个token之间的间隔超时超时配置策略
java
// 教学简化版 - 多层超时配置
webClient.post()
.uri(apiPath)
.retrieve()
.bodyToFlux(String.class)
// 元素间隔超时:30秒内没有新token则超时
.timeout(Duration.ofSeconds(30))
// 整体超时:5分钟后强制结束
.take(Duration.ofMinutes(5))
.onErrorResume(TimeoutException.class, e -> {
log.warn("流式聊天超时: {}", e.getMessage());
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("{\"type\":\"error\",\"error\":\"生成超时,请重试\"}")
.build()
);
});9.2 网络中断处理
网络中断可能发生在链路的任何位置:客户端到Nginx、Nginx到应用、应用到模型服务。
服务端检测网络中断
java
// 教学简化版 - 检测客户端断开
return flux
.doOnCancel(() -> {
log.info("客户端断开连接");
// 可以在这里执行清理操作,如取消模型推理
})
.doOnError(e -> {
if (e instanceof WebClientRequestException) {
log.error("网络错误: {}", e.getMessage());
}
});客户端检测网络中断
javascript
// 使用fetch的AbortController实现超时和取消
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 300000); // 5分钟超时
try {
const response = await fetch('/ai/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ userId: 1, message: '你好' }),
signal: controller.signal
});
// 处理响应...
} catch (error) {
if (error.name === 'AbortError') {
console.error('请求超时或被取消');
} else {
console.error('网络错误:', error);
}
} finally {
clearTimeout(timeoutId);
}9.3 模型服务不可用处理
当Ollama或OpenAI服务不可用时,需要优雅地处理错误并给用户友好的提示。
健康检查
在smart-scaffold项目中,提供了健康检查接口:
java
// 来自 smart-scaffold-dubbo consumer AIController.java
@GetMapping("/health")
public ApiResult<?> healthCheck() {
return ApiResult.success(Map.of(
"status", "healthy",
"service", "ai-controller",
"currentApiType", modelService.getCurrentApiType()
));
}服务不可用时的降级处理
java
// 教学简化版 - 服务降级
.onErrorResume(WebClientResponseException.class, e -> {
log.error("模型服务响应错误: status={}, body={}",
e.getStatusCode(), e.getResponseBodyAsString());
String errorMessage;
if (e.getStatusCode().value() == 401) {
errorMessage = "API Key无效,请检查配置";
} else if (e.getStatusCode().value() == 429) {
errorMessage = "请求过于频繁,请稍后再试";
} else if (e.getStatusCode().value() == 500) {
errorMessage = "模型服务内部错误,请稍后再试";
} else {
errorMessage = "模型服务不可用: " + e.getMessage();
}
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("{\"type\":\"error\",\"error\":\""
+ errorMessage.replace("\"", "\\\"") + "\"}")
.build()
);
});9.4 Token超限处理
当模型的回复达到 max_tokens 限制时,OpenAI会设置 finish_reason: "length",Ollama则会在达到限制时停止生成。
检测Token超限
java
// 教学简化版 - 检测Token超限
.map(content -> {
Map<String, Object> json = parseJson(content);
if (json.containsKey("choices")) {
List<Map<String, Object>> choices =
(List<Map<String, Object>>) json.get("choices");
if (!choices.isEmpty()) {
String finishReason = (String) choices.get(0).get("finish_reason");
if ("length".equals(finishReason)) {
// Token超限,追加警告
return ServerSentEvent.<String>builder()
.event("warning")
.data("{\"type\":\"warning\","
+ "\"message\":\"回复可能被截断\"}")
.build();
}
}
}
// 正常处理...
})动态调整max_tokens
一种更优雅的解决方案是根据输入长度动态调整 max_tokens:
java
// 教学简化版 - 动态max_tokens
private int calculateMaxTokens(String inputMessage, int modelMaxTokens) {
// 估算输入token数(粗略估计:1个中文字符约1.5个token)
int inputTokens = (int) (inputMessage.length() * 1.5);
// 保留20%的余量给输出
int outputTokens = modelMaxTokens - inputTokens;
return Math.max(100, Math.min(outputTokens, 4096));
}9.5 优雅降级策略
当流式响应出现异常时,优雅降级策略确保用户始终能获得有意义的反馈。
降级策略层次
第一层:流式响应正常工作
|
v 异常发生
第二层:返回已生成的部分内容 + 错误提示
|
v 流式完全失败
第三层:降级为非流式请求(重试一次)
|
v 非流式也失败
第四层:返回友好的错误提示 + 建议操作实现优雅降级
java
// 教学简化版 - 优雅降级
public Flux<ServerSentEvent<String>> chatStreamWithFallback(
Map<String, Object> requestBody) {
try {
return chatStream(requestBody)
// 降级:流式失败时尝试非流式
.onErrorResume(e -> {
log.warn("流式请求失败,尝试非流式降级: {}", e.getMessage());
try {
String response = chatClientFactory.chat(
Long.valueOf(requestBody.get("userId").toString()),
requestBody.get("message").toString(),
false
);
// 将完整响应作为单个chunk返回
return Flux.just(
ServerSentEvent.<String>builder()
.event("message")
.data("{\"type\":\"chunk\",\"content\":\""
+ escapeJson(response) + "\"}")
.build(),
ServerSentEvent.<String>builder()
.event("done")
.data("{\"type\":\"done\"}")
.build()
);
} catch (Exception fallbackError) {
log.error("非流式降级也失败: {}", fallbackError.getMessage());
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("{\"type\":\"error\","
+ "\"error\":\"服务暂时不可用,请稍后再试\"}")
.build()
);
}
});
} catch (Exception e) {
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("{\"type\":\"error\","
+ "\"error\":\"请求处理失败\"}")
.build()
);
}
}十、性能优化
10.1 连接池配置
在高并发场景下,合理的连接池配置可以显著提升性能。Reactor Netty的 ConnectionProvider 提供了灵活的连接池管理能力。
连接池配置
java
// 教学简化版 - 连接池配置
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
ConnectionProvider provider = ConnectionProvider.builder("ai-service-pool")
// 最大连接数:根据模型服务并发能力设置
.maxConnections(100)
// 空闲连接超时:超过此时间未使用的连接将被关闭
.maxIdleTime(Duration.ofSeconds(60))
// 连接最大生命周期:防止长时间使用的连接出现内存泄漏
.maxLifeTime(Duration.ofSeconds(300))
// 获取连接超时:当连接池耗尽时,等待可用连接的最长时间
.pendingAcquireTimeout(Duration.ofSeconds(30))
// 获取连接超时时的回调
.pendingAcquireMaxCount(500) // 最大等待队列长度
// 连接驱逐检查间隔
.evictInBackground(Duration.ofSeconds(120))
.build();
HttpClient httpClient = HttpClient.create(provider)
.responseTimeout(Duration.ofSeconds(300))
.compress(true); // 启用响应压缩(仅用于非SSE请求)
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();连接池监控
java
// 教学简化版 - 连接池监控
ConnectionProvider provider = ConnectionProvider.builder("ai-service-pool")
.maxConnections(100)
.listener((pool, info) -> {
switch (info.kind()) {
case ACQUIRE:
log.debug("从连接池获取连接,当前活跃连接: {}", info.connectionCount());
break;
case RELEASE:
log.debug("释放连接到连接池");
break;
case CREATE:
log.debug("创建新连接");
break;
case EVICT:
log.debug("驱逐过期连接");
break;
}
})
.build();10.2 背压控制
在AI流式聊天场景中,背压控制确保系统在高负载下保持稳定。
请求限流
使用Reactor的 limitRate 操作符控制请求速率:
java
// 教学简化版 - 请求限流
return webClient.post()
.uri(apiPath)
.retrieve()
.bodyToFlux(String.class)
.limitRate(10) // 每次最多请求10个元素
.map(this::parseContent);信号量并发控制
使用 Semaphore 限制同时进行的流式请求数量:
java
// 教学简化版 - 并发控制
private final Semaphore streamSemaphore = new Semaphore(50);
public Flux<ServerSentEvent<String>> chatStream(
Map<String, Object> requestBody) {
if (!streamSemaphore.tryAcquire()) {
return Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("{\"type\":\"error\","
+ "\"error\":\"服务器繁忙,请稍后再试\"}")
.build()
);
}
return doChatStream(requestBody)
.doFinally(signal -> streamSemaphore.release());
}10.3 缓冲区大小优化
合理的缓冲区大小可以平衡内存使用和数据传输效率。
Netty缓冲区配置
java
// 教学简化版 - 缓冲区配置
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区:32KB
.option(ChannelOption.SO_SNDBUF, 32 * 1024) // 发送缓冲区:32KB
.responseTimeout(Duration.ofSeconds(300));Spring Codec缓冲区配置
yaml
# application.yml - Codec缓冲区配置
spring:
codec:
max-in-memory-size: 16MB # 最大内存缓冲区java
// 编程方式配置
WebClient webClient = WebClient.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
.build();在AI流式聊天场景中,每个token通常只有几个字节到几十个字节,因此不需要很大的缓冲区。32KB的缓冲区足够处理绝大多数情况。
10.4 并发限制
全局限流器
java
// 教学简化版 - 基于Reactor的限流
@Component
public class StreamRateLimiter {
private final AtomicInteger activeStreams = new AtomicInteger(0);
private final int maxConcurrentStreams;
public StreamRateLimiter(
@Value("${ai.stream.max-concurrent:50}")
int maxConcurrentStreams) {
this.maxConcurrentStreams = maxConcurrentStreams;
}
public <T> Flux<T> limit(Flux<T> flux) {
return Flux.defer(() -> {
if (activeStreams.get() >= maxConcurrentStreams) {
return Flux.error(new RuntimeException(
"并发流数量已达上限(" + maxConcurrentStreams + ")"));
}
activeStreams.incrementAndGet();
return flux.doFinally(signal -> activeStreams.decrementAndGet());
});
}
}用户级限流
java
// 教学简化版 - 用户级限流
@Component
public class UserStreamRateLimiter {
private final ConcurrentHashMap<Long, AtomicInteger> userStreamCount
= new ConcurrentHashMap<>();
private final int maxStreamsPerUser = 3;
public boolean tryAcquire(Long userId) {
AtomicInteger count = userStreamCount.computeIfAbsent(
userId, k -> new AtomicInteger(0));
if (count.get() >= maxStreamsPerUser) {
return false;
}
count.incrementAndGet();
return true;
}
public void release(Long userId) {
AtomicInteger count = userStreamCount.get(userId);
if (count != null) {
count.decrementAndGet();
}
}
}10.5 内存管理
避免内存泄漏
在响应式编程中,最常见的内存泄漏原因是订阅了 Flux 但没有正确处理完成或错误信号。在AI流式聊天场景中,需要特别注意以下几点:
- 及时释放资源: 使用
doFinally确保在任何情况下(完成、错误、取消)都能释放资源。 - 避免在map中积累数据: 不要在
map操作符中将所有token累积到一个StringBuilder中,除非确实需要完整内容。 - 限制缓冲区大小: 使用
limitRate控制内存中的数据量。
java
// 教学简化版 - 内存安全的流式处理
return webClient.post()
.uri(apiPath)
.retrieve()
.bodyToFlux(String.class)
.limitRate(50) // 控制内存中的元素数量
.map(this::parseContent)
.doFinally(signal -> {
// 无论成功、失败还是取消,都会执行
log.info("流式请求结束: signal={}", signal);
// 释放资源、更新状态等
});内存监控
java
// 教学简化版 - 内存监控
@Component
public class MemoryMonitor {
private final AtomicLong activeStreams = new AtomicLong(0);
@Scheduled(fixedRate = 10000) // 每10秒执行一次
public void reportMemoryUsage() {
Runtime runtime = Runtime.getRuntime();
long usedMB = (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024);
long maxMB = runtime.maxMemory() / (1024 * 1024);
log.info("内存使用: {}MB/{}MB, 活跃流: {}",
usedMB, maxMB, activeStreams.get());
if (usedMB > maxMB * 0.8) {
log.warn("内存使用超过80%,建议检查流式连接数量");
}
}
}JVM参数调优
bash
# 启动参数 - 针对流式场景优化
java -Xms512m -Xmx2g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:+HeapDumpOnOutOfMemoryError \
-jar smart-scaffold-web.jarG1垃圾回收器适合流式场景,因为它可以控制GC暂停时间,避免在流式推送过程中出现长时间停顿。
总结与展望
本文基于smart-scaffold-springboot和smart-scaffold-dubbo两个实际项目,完整地解析了SSE流式响应在大模型聊天场景中的全链路实现。让我们回顾一下核心技术要点:
架构设计总结
smart-scaffold项目的AI流式聊天架构采用了分层设计:
前端 (EventSource / fetch)
|
v SSE (text/event-stream)
Consumer Controller (Spring WebFlux)
|
v Flux<ServerSentEvent<String>>
WebClient (Reactor Netty)
|
v HTTP POST + SSE
Provider Controller (Spring MVC)
|
v Flux<String>
ChatClientFactory (协议适配)
|
v HTTP POST + SSE
模型服务 (Ollama / OpenAI)每一层都有明确的职责:
- Consumer Controller 负责SSE事件构建和前端对接
- WebClient 负责响应式HTTP通信
- Provider Controller 负责流式接口暴露
- ChatClientFactory 负责协议适配和请求构建
- 模型服务 负责实际的文本生成
核心技术要点
- SSE协议:基于HTTP长连接的服务器推送技术,比WebSocket更简单,比轮询更高效。
- Spring WebFlux:基于Reactor的响应式Web框架,天然支持流式响应。
- WebClient:非阻塞HTTP客户端,替代RestTemplate,支持流式数据接收。
- 双协议适配:通过策略模式屏蔽Ollama和OpenAI的格式差异。
- ServerSentEvent:Spring提供的SSE事件构建工具,支持event、data、id、retry等字段。
- 异常处理:多层级的错误处理和优雅降级策略。
- 性能优化:连接池、背压控制、缓冲区优化、并发限制和内存管理。
未来展望
随着大模型技术的快速发展,流式响应技术也在不断演进。以下是几个值得关注的方向:
- 多模态流式响应: 除了文本,未来可能需要流式传输图片、音频、视频等多模态内容。
- 流式函数调用(Function Calling): 模型在生成过程中调用外部工具,结果实时返回并影响后续生成。
- 流式RAG: 在流式生成过程中实时检索知识库,将检索结果注入生成上下文。
- Server-Sent Events over HTTP/3: HTTP/3基于QUIC协议,可以进一步提升SSE的性能和可靠性。
- AI Agent流式编排: 多个AI Agent之间的流式协作,每个Agent的输出作为下一个Agent的输入。
smart-scaffold项目将持续跟进这些技术趋势,不断完善AI聊天模块的功能和性能。欢迎关注项目更新,一起探索AI应用开发的最佳实践。
版权声明: 本文为必码(bima.cc)原创技术文章,仅供学习交流。
本文内容基于实际项目源码解析整理,代码示例均为教学简化版本,仅供学习参考。
文档内容提取自项目源码与配置文件,如需获取完整项目代码,请访问 bima.cc。