一、背景说明
- SSE(Server-Sent Events) 是基于HTTP的单向服务器推送技术,客户端通过EventSource监听服务器推送的消息流。
- MCP在此示例中定义为多通道消息传输协议,可通过不同事件名区分消息类型。
- Python Server端用
FastAPI
+ uvicorn
实现 SSE 长连接推送。
- Python Client端用
httpx
+ 异步监听处理服务器发送事件。
二、环境准备
pip install fastapi uvicorn httpx
三、服务器端代码(FastAPI)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
import time
app = FastAPI()
# MCP事件格式封装
def format_sse(data: dict, event: str = None) -> str:
msg = ""
if event:
msg += f"event: {event}\n"
msg += f"data: {json.dumps(data)}\n\n"
return msg
# SSE消息生成器
async def event_generator():
counter = 0
while True:
# 模拟多通道消息:channel1 和 channel2
if counter % 2 == 0:
data = {"channel": "channel1", "msg": f"Message {counter} from channel1"}
yield format_sse(data, event="channel1")
else:
data = {"channel": "channel2", "msg": f"Message {counter} from channel2"}
yield format_sse(data, event="channel2")
counter += 1
await asyncio.sleep(2) # 每2秒推送一次
@app.get('/mcp/sse')
async def sse_endpoint(request: Request):
async def event_stream():
generator = event_generator()
async for event in generator:
# 如果客户端断开,退出循环
if await request.is_disconnected():
print("Client disconnected")
break
yield event
return StreamingResponse(event_stream(), media_type="text/event-stream")
四、客户端代码(异步监听 SSE)
import httpx
import asyncio
async def listen_sse(url):
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("GET", url) as response:
async for line in response.aiter_lines():
if line.startswith("event:"):
event_name = line.replace("event: ", "")
elif line.startswith("data:"):
data_json = line.replace("data: ", "")
print(f"Received event: {event_name}, data: {data_json}")
if __name__ == "__main__":
url = "http://127.0.0.1:8000/mcp/sse"
asyncio.run(listen_sse(url))
五、运行说明
- 启动服务器:
uvicorn server:app --reload
- 运行客户端监听脚本,即可看到服务器每2秒推送的两路MCP消息。
六、总结
- 服务器端通过FastAPI的StreamingResponse支持SSE长连接推送,实现多通道消息按事件区分。
- 客户端使用httpx异步流式读取事件,实现对不同通道消息的实时监听与处理。
- 该示范是实现MCP协议SSE通信的基础框架,可根据需求拓展消息格式、鉴权、断线重连等功能。
发表回复