一、背景说明

  • SSE(Server-Sent Events) 是基于HTTP的单向服务器推送技术,客户端通过EventSource监听服务器推送的消息流。
  • MCP在此示例中定义为多通道消息传输协议,可通过不同事件名区分消息类型。
  • Python Server端用 FastAPI + uvicorn 实现 SSE 长连接推送。
  • Python Client端用 httpx + 异步监听处理服务器发送事件。

二、环境准备

pip install fastapi uvicorn httpx

三、服务器端代码(FastAPI)

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
import time

app = FastAPI()

# MCP事件格式封装
def format_sse(data: dict, event: str = None) -> str:
    msg = ""
    if event:
        msg += f"event: {event}\n"
    msg += f"data: {json.dumps(data)}\n\n"
    return msg

# SSE消息生成器
async def event_generator():
    counter = 0
    while True:
        # 模拟多通道消息:channel1 和 channel2
        if counter % 2 == 0:
            data = {"channel": "channel1", "msg": f"Message {counter} from channel1"}
            yield format_sse(data, event="channel1")
        else:
            data = {"channel": "channel2", "msg": f"Message {counter} from channel2"}
            yield format_sse(data, event="channel2")

        counter += 1
        await asyncio.sleep(2)  # 每2秒推送一次

@app.get('/mcp/sse')
async def sse_endpoint(request: Request):
    async def event_stream():
        generator = event_generator()
        async for event in generator:
            # 如果客户端断开,退出循环
            if await request.is_disconnected():
                print("Client disconnected")
                break
            yield event

    return StreamingResponse(event_stream(), media_type="text/event-stream")

四、客户端代码(异步监听 SSE)

import httpx
import asyncio

async def listen_sse(url):
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("GET", url) as response:
            async for line in response.aiter_lines():
                if line.startswith("event:"):
                    event_name = line.replace("event: ", "")
                elif line.startswith("data:"):
                    data_json = line.replace("data: ", "")
                    print(f"Received event: {event_name}, data: {data_json}")

if __name__ == "__main__":
    url = "http://127.0.0.1:8000/mcp/sse"
    asyncio.run(listen_sse(url))

五、运行说明

  1. 启动服务器:
uvicorn server:app --reload
  1. 运行客户端监听脚本,即可看到服务器每2秒推送的两路MCP消息。

六、总结

  • 服务器端通过FastAPI的StreamingResponse支持SSE长连接推送,实现多通道消息按事件区分。
  • 客户端使用httpx异步流式读取事件,实现对不同通道消息的实时监听与处理。
  • 该示范是实现MCP协议SSE通信的基础框架,可根据需求拓展消息格式、鉴权、断线重连等功能。