阿里妹導讀
本文透過100行程式碼看到MCP的核心原理並不複雜,但它的設計巧妙深入理解使我們能夠超越簡單的SDK使用,建立更強大、更靈活的AI應用整合方案。
當我開始研究 Model Context Protocol (MCP)接入的時候,發現一個問題,絕大多數的文件都是以
@mcp.tool
這樣註解的方式注入。但如果當前有很多非同步的業務流程,接入會非常麻煩,它並沒有一個程式碼實體的存在可以加註解。難道需要為一個個流程編寫同步函式嗎?好奇心驅使我進一步分析MCP的通訊原理,看看是不是有什麼辦法能更方便地接入MCP,理解MCP的原理。
MCP的通訊方式
MCP提供了STDIO和SSE兩種傳輸協議,當前很多實驗性的工具都是使用STDIO傳輸。不過如果提供服務的話,基本就是SSE(Server-Sent Events)。所以本文重點分析討論SSE的MCP接入模式。

在搜尋SSE的時候,看到了阮一峰老師在2017年對於SSE的特點歸納:
SSE 與 WebSocket 作用相似,都是建立瀏覽器與伺服器之間的通訊渠道,然後伺服器向瀏覽器推送資訊。
總體來說,WebSocket 更強大和靈活。因為它是全雙工通道,可以雙向通訊;SSE 是單向通道,只能伺服器向瀏覽器傳送,因為流資訊本質上就是下載。如果瀏覽器向伺服器傳送資訊,就變成了另一次 HTTP 請求。
這個特點讓我更加好奇了,stdio中可以使用stdin來進行輸入,使用stdout來進行輸出。但是SSE是單向通道,MCP要如何實現雙向通訊呢?是建立兩根SSE通道嗎?帶著這個疑問,我開始了動手實踐。
MCP的SSE通訊流程
利用MCP官方提供的工具
npx @modelcontextprotocol/inspector
可以比較方便地拉起一個驗證MCP的管理頁。針對這個管理頁抓包就能發現一些SSE的通訊端倪。
1./sse這個URL只負責推送資訊,並不能傳送資訊,傳送資訊需要另外的URL。
2.Client連線上
/sse
這個地址的第一個Event就是告訴Client傳送資訊需要去哪個URL發,這個URL通常會帶上唯一的會話ID。觀察這個抓包情況,我們前面的雙向通訊疑問基本可以有答案了:
1.只有一根SSE長連線,用來Server向Client推送資料,另外一個Client向Server傳送請求的通道是使用普通的HTTP POST請求。
2.Client向Server傳送的HTTP POST請求中只使用2xx反饋是否收到指令,所有的資料返回是透過一開始的SSE長連線來推送。
為了驗證這個猜想,我還特地做一個實驗,使用curl模擬了POST
/messsage?sessionId=***
傳送一個請求包,果不其然在SSE的事件流中多了一條事件。MCP的SSE通訊實現
透過上一個章節的抓包,我們基本摸清了MCP的SSE通訊流程:
1./sse URL建立SSE長鏈之後先返回一個
endpoint
(常見為/message
),資料格式為純文字的同域名URL字串。2.client使用POST向
endpoint(/message)
傳送呼叫請求,POST中的body滿足JSON-RPC規範,包含欄位jsonrpc
、method
、params
、id
。3.在
/sse
長連線中返回的event滿足JSON-RPC規範,包含欄位jsonrpc
、result
、id
、error(執行錯誤時)
。看起好像並不複雜,我們嘗試用Python來實現一下(不使用MCP Python SDK)。
from fastapi import FastAPI, Request
import uuid
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import json
app = FastAPI()
mcpHub = {}
classMcpRequest(BaseModel):
id: Optional[int] = None
jsonrpc: str
method: str
params: Optional[dict] = None
classMCPServer:
def__init__(self):
self.queue = asyncio.Queue()
asyncdefreader(self):
whileTrue:
event = await self.queue.get()
yield event
asyncdefrequest(self, payload: McpRequest):
if payload.method == "initialize":
await self.queue.put({"event": "message", "data": ..})
elif payload.method == "tools/list":
...
asyncdefsse():
client_id = str(uuid.uuid4())
mcp = MCPServer()
mcpHub[client_id] = mcp
await mcp.queue.put({"event": "endpoint", "data": f"/message?client_id={client_id}"})
return EventSourceResponse(mcp.reader())
asyncdefmessage(request: Request, payload: McpRequest):
client_id = request.query_params.get("client_id")
if client_id notin mcpHub:
return"no client"
await mcpHub[client_id].request(payload)
return"ok"
在這段程式碼中,我們引入了這樣幾個設計:
1.我們使用了
asyncio.Queue()
來解耦業務流和MCP服務流。這個訊息佇列聯動EventSourceResponse
的資料流。每往這個訊息佇列打一個訊息,就會自動透過EventSourceResponse
的資料流向Client推送一條訊息。這樣Client在Server側看起來就是一個標準的訂閱MQ的消費者。2.在記憶體中維護一個
client_id
對映訊息佇列的字典,這樣一旦有訊息進入就可以知曉使用的是哪個MQ,然後往對應的MQ裡面投遞訊息。在分散式系統中,這個client_id
可以是訊息佇列的全域性唯一標識,這樣無論打到哪臺機器上,都能夠找到正確的佇列。3.服務側在處理之後,將訊息投遞迴訊息佇列之後,Client就能感知。MCPServer和MCPClient保持長鏈之後,後方的業務系統側理論上可以進行無限時長執行(如果Client側不主動超時退出),一切均以訊息投遞回來為準。

我們可以參考文件來看看有哪些
method
需要被支援:
MCP的訂閱模式擴充套件思考
在MCP的resource的method中,有個不起眼的
resources/subcribe
引起了我的注意。首先。我們來看看什麼是resource
,官方給出的定義是:Resources represent any kind of data that an MCP server wants to make available to clients. This can include:File contents、Database records、API responses、Live system data、Screenshots and images、Log files、And more
所以,如果我們使用
resources/subcribe
訂閱一個Database
,那麼這個資料庫的所有變動就會源源不斷地推送過來,這就非常近似流計算的常見使用形態了。因為SSE已經讓Server建立向Client的單向資料流,所以如果Client發起一個訂閱,我們就建立一個Flink流計算任務向MQ打訊息,就非常原生地實現了資源的訂閱。我們可以擴充套件一下上面的這個拓撲結構。

1.從大模型視角看流計算:基於MCP協議,大模型實際上能夠非常優雅地接入流計算的能力,來完成複雜業務邏輯構建。
2.從流計算視角看大模型:使用MCP協議之後,大模型似乎就變成了一個標準流計算處理節點,能夠接收流式訊息,也能給向另外的MQ投遞訊息。
不得不說,這個確實就是MCP設計上的一個優勢。感覺MCP有點像RPC,又有點像MQ,那麼這到底是什麼呢?我們不妨從程式設計模型的角度來思考一下。
MCP的程式設計模型思考
MCP從程式設計模型的角度來看,本質上是一種有狀態的雙向RPC(遠端過程呼叫)模型,結合了事件驅動和請求-響應的特性。這種混合模式使其在AI應用與外部系統整合方面具有獨特優勢。
MCP的核心特徵包括:
1.有狀態會話:與傳統無狀態REST API不同,MCP維護會話狀態,客戶端和伺服器之間建立長期連線,會話具有明確的生命週期。
2.雙向通訊:不僅客戶端可以呼叫伺服器(傳統RPC模式),伺服器也可以呼叫客戶端(反向RPC)。例如,伺服器可以請求客戶端執行AI取樣。
3.基於能力的協商:初始化階段進行能力協商,動態發現可用功能,適應不同實現和版本。
4.事件通知機制:支援單向通知,資源變更訂閱模式,非同步事件處理。
5.標準化介面:定義了一組標準操作,使用JSON Schema定義引數和返回值,促進互操作性。
為了更好地理解MCP的定位,我們可以將其與其他常見的程式設計模型進行比較:
MCP vs REST API

MCP vs 訊息佇列(MQ)

MCP vs WebSocket

MCP在各種程式設計模型中佔據了一個獨特的位置:
-
比REST API更有狀態和雙向,但比訊息佇列更直接和輕量。
-
比WebSocket更結構化和標準化,但比gRPC更靈活和易於理解。
-
比GraphQL更專注於工具呼叫,但比RPC更關注資源和上下文。
同時,正因為MCP這樣的一個獨特的功能位,不要因為當前的一些能力侷限性,就放棄了MCP的原生化的適配。非同步任務、事件驅動等架構本身就應該能夠原生對接MCP。
MCP服務的簡單實現
既然MCP的整個執行原理並不複雜,我們就嘗試自己實現一次,致敬一下這個優秀的設計。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import uuid
from pydantic import BaseModel
from typing importOptional
import uvicorn
import inspect
app = FastAPI()
mcpHub = {}
classMcpRequest(BaseModel):
id: Optional[int] = None
jsonrpc: str
method: str
params: Optional[dict] = None
classMCPServer:
def__init__(self, name, message_path, tools):
self.queue = asyncio.Queue()
self.client_id = str(uuid.uuid4())
self.message_path = message_path
self.info = {
"protocolVersion": "2024-11-05",
"capabilities": {
"experimental": {},
"tools": {
"listChanged": False
}
},
"serverInfo": {
"name": name,
"version": "1.6.0"
}
}
self.tools = tools
deflist_tool(self):
result = []
for tool in self.tools:
toolInfo = {
"name": tool.__name__,
"description": tool.__doc__,
"inputSchema": {"type": "object","properties":{}},
}
for name, param in inspect.signature(tool).parameters.items():
toolInfo["inputSchema"]["properties"][name] = {
"title": name,
"type": "string",
}
result.append(toolInfo)
return result
asyncdefreader(self):
whileTrue:
event = await self.queue.get()
yield event
@staticmethod
defresponse(result, id):
message = {
"jsonrpc": "2.0",
"result": result,
}
ifidisnotNone:
message["id"] = id
return json.dumps(message)
asyncdefrequest(self, req: McpRequest):
if req.method == "initialize":
await self.queue.put({"event": "message", "data": self.response(self.info, req.id)})
elif req.method == "tools/list":
await self.queue.put({"event": "message", "data": self.response({"tools": self.list_tool()}, req.id)})
elif req.method == "tools/call":
for tool in self.tools:
if tool.__name__ == req.params.get("name"):
result = await tool(**req.params["arguments"])
await self.queue.put({"event": "message", "data": self.response({"content": result, "isError": False}, req.id)})
break
asyncdeftest(state=None):
"""
description
"""
result = f"hi {state}"
await asyncio.sleep(1)
result += "!"
return result
asyncdefreceive_test():
mcp = MCPServer(name="mcp-test",message_path="/send_test", tools=[test])
mcpHub[mcp.client_id] = mcp
await mcp.queue.put({"event": "endpoint", "data": f"{mcp.message_path}?client_id={mcp.client_id}"})
return EventSourceResponse(mcp.reader())
asyncdefsend_test(request: Request, payload: McpRequest):
client_id = request.query_params.get("client_id")
if client_id notin mcpHub:
return"no client"
await mcpHub[client_id].request(payload)
return"ok"
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)
如上大概100行左右的程式碼,我們實現了一個簡易版本的MCP服務,較官方的MCP Python SDK,我們獲得了幾個重要的特性最佳化:
1.tool註冊不再依賴
@mcp.tool
這樣的註解,完全可以動態傳入,針對不同的場景,提供不同MCP URL,上面提供不同的Tool。2.程式設計模型為MQ驅動的服務,對接非同步系統、事件驅動的系統或平臺較為友好。參考該Python實現,轉化成其他語言的版本也較為方便。
3.不依賴 /sse /message 這些預設路由地址,也能正常執行,證明MCP的URL可以完全自定義。
總結:理解MCP的本質
本文深入探討MCP的原理、通訊機制和程式設計模型本質之後,我們看到MCP不僅僅是一個簡單的API或SDK,而是一個精心設計的協議,它:
1.採用client-host-server架構,支援多種伺服器連線;
2.實現了有狀態的雙向RPC模型,結合了事件驅動特性;
3.提供了標準化的工具呼叫和資源訪問機制;
4.支援動態能力協商和功能發現;
5.相比較MQ、API、WS,佔據了獨特的功能位置,專為AI應用與外部系統整合而最佳化;
正如我們透過100行程式碼看到的,MCP的核心原理並不複雜,但它的設計巧妙,這種深入理解將使我們能夠超越簡單的SDK使用,建立更強大、更靈活的AI應用整合方案。
參考材料
-
Model Context Protocol(MCP)詳解和開發教程 https://blog.csdn.net/ZYC88888/article/details/146414158
-
Server-Sent Events 教程 https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
-
https://github.com/modelcontextprotocol/inspector
-
(譯) JSON-RPC 2.0 規範(中文版) https://wiki.geekdream.com/Specification/json-rpc_2.0.html
低成本、高效能的湖倉一體化架構
湖倉一體架構融合了資料湖的低成本、高擴充套件性,以及資料倉庫的高效能、強資料治理能力,高效應對大資料時代的挑戰。SelectDB 透過高效能資料分析處理引擎和豐富的湖倉資料對接能力,助力企業加速從 0 到 1 構建湖倉體系,降低轉型過程中的風險和成本。
點選閱讀原文檢視詳情。