在 Java 中使用 WebClient + SSE(Server-Sent Events) 来消费服务端流式响应是 Spring WebFlux 中推荐的方式。下面是完整的示例结构,展示如何使用 WebClient 接收 SSE 数据流,适用于对接 OpenAI、LangChain、Spring SSE 服务等流式响应。


✅ 一、服务端返回 SSE(text/event-stream)

// 示例 Controller - 模拟返回 SSE 流数据
@RestController
@RequestMapping("/sse")
public class SseServerController {

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamEvents() {
        return Flux.interval(Duration.ofMillis(500))
                   .map(i -> "服务器消息: " + i)
                   .take(10);
    }
}

✅ 二、客户端 WebClient 接收 SSE 流(推荐方式)

你可以使用 WebClient 来发起 SSE 请求,并以 Flux 方式处理:

@Component
public class SseClient {

    private final WebClient webClient = WebClient.create("http://localhost:8080");

    public void consumeSseStream() {
        webClient.get()
                .uri("/sse/stream")
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
                .doOnNext(msg -> System.out.println("收到消息: " + msg))
                .blockLast(); // 阻塞直到流完成(可改为 subscribe 异步)
    }
}

✅ 或者异步监听(非阻塞):

public void asyncSseListen() {
    webClient.get()
             .uri("/sse/stream")
             .accept(MediaType.TEXT_EVENT_STREAM)
             .retrieve()
             .bodyToFlux(String.class)
             .subscribe(msg -> {
                 System.out.println("接收到: " + msg);
             }, error -> {
                 System.err.println("错误: " + error.getMessage());
             }, () -> {
                 System.out.println("接收结束");
             });
}

✅ 三、使用 WebClient 对接 OpenAI 流式接口(SSE)

如果你使用的是 OpenAI Chat Completion 接口并启用 stream=true,格式类似 SSE:

public void callOpenAIStream(String prompt) {
    WebClient client = WebClient.builder()
        .baseUrl("https://api.openai.com/v1")
        .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer YOUR_API_KEY")
        .build();

    Map<String, Object> requestBody = Map.of(
        "model", "gpt-3.5-turbo",
        "stream", true,
        "messages", List.of(Map.of("role", "user", "content", prompt))
    );

    client.post()
        .uri("/chat/completions")
        .contentType(MediaType.APPLICATION_JSON)
        .accept(MediaType.TEXT_EVENT_STREAM)
        .bodyValue(requestBody)
        .retrieve()
        .bodyToFlux(String.class) // 返回每一行 SSE 数据
        .takeUntil(msg -> msg.contains("[DONE]")) // OpenAI 特有结束标志
        .map(msg -> parseOpenAIChunk(msg)) // 你可在此解析 JSON 结构
        .doOnNext(System.out::println)
        .blockLast();
}

✅ 四、注意事项

  • bodyToFlux(String.class) 获取每个 SSE 数据块;
  • OpenAI 返回的是 伪 SSE(非标准 JSON,每行开头为 data:,需解析;
  • 如果你使用 Spring Boot 3.x + WebFlux,无需任何特殊依赖;
  • 对接 GPT 时需设置 header 和解析 data: {json...} 行。

✅ 五、前端配合(原生 JS)

<script>
  const source = new EventSource("/sse/stream");
  source.onmessage = function(e) {
    console.log("前端收到数据:", e.data);
  };
</script>