在 Java 中使用 WebClient + SSE(Server-Sent Events) 来消费服务端流式响应是 Spring WebFlux 中推荐的方式。下面是完整的示例结构,展示如何使用 WebClient 接收 SSE 数据流,适用于对接 OpenAI、LangChain、Spring SSE 服务等流式响应。
✅ 一、服务端返回 SSE(text/event-stream)
// 示例 Controller - 模拟返回 SSE 流数据
@RestController
@RequestMapping("/sse")
public class SseServerController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamEvents() {
return Flux.interval(Duration.ofMillis(500))
.map(i -> "服务器消息: " + i)
.take(10);
}
}
✅ 二、客户端 WebClient 接收 SSE 流(推荐方式)
你可以使用 WebClient
来发起 SSE 请求,并以 Flux 方式处理:
@Component
public class SseClient {
private final WebClient webClient = WebClient.create("http://localhost:8080");
public void consumeSseStream() {
webClient.get()
.uri("/sse/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.doOnNext(msg -> System.out.println("收到消息: " + msg))
.blockLast(); // 阻塞直到流完成(可改为 subscribe 异步)
}
}
✅ 或者异步监听(非阻塞):
public void asyncSseListen() {
webClient.get()
.uri("/sse/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.subscribe(msg -> {
System.out.println("接收到: " + msg);
}, error -> {
System.err.println("错误: " + error.getMessage());
}, () -> {
System.out.println("接收结束");
});
}
✅ 三、使用 WebClient 对接 OpenAI 流式接口(SSE)
如果你使用的是 OpenAI Chat Completion 接口并启用 stream=true
,格式类似 SSE:
public void callOpenAIStream(String prompt) {
WebClient client = WebClient.builder()
.baseUrl("https://api.openai.com/v1")
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer YOUR_API_KEY")
.build();
Map<String, Object> requestBody = Map.of(
"model", "gpt-3.5-turbo",
"stream", true,
"messages", List.of(Map.of("role", "user", "content", prompt))
);
client.post()
.uri("/chat/completions")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM)
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(String.class) // 返回每一行 SSE 数据
.takeUntil(msg -> msg.contains("[DONE]")) // OpenAI 特有结束标志
.map(msg -> parseOpenAIChunk(msg)) // 你可在此解析 JSON 结构
.doOnNext(System.out::println)
.blockLast();
}
✅ 四、注意事项
bodyToFlux(String.class)
获取每个 SSE 数据块;- OpenAI 返回的是 伪 SSE(非标准 JSON,每行开头为
data:
),需解析; - 如果你使用 Spring Boot 3.x + WebFlux,无需任何特殊依赖;
- 对接 GPT 时需设置 header 和解析
data: {json...}
行。
✅ 五、前端配合(原生 JS)
<script>
const source = new EventSource("/sse/stream");
source.onmessage = function(e) {
console.log("前端收到数据:", e.data);
};
</script>
发表回复