DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

文章标题:

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

文章内容:

DeepSeek和高德地图之间会碰撞出怎样的创新火花呢,DeepSeek又该如何借助高德地图的MCP来达成路径规划、行程安排等功能,本文将会给出解答。

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

高德地图官方提供了在Cursor中使用高德MCP服务的示例:

向大模型提出需求“明天去北京国贸出差,帮忙预定1km以内的3星级酒店”。

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

可以看到,Cursor在接入高德MCP之后,LLM能够利用高德地图提供的工具来查找北京国贸附近的酒店。

高德地图官方给出了在Cursor中配置MCP服务的教程:快速接入-MCP Server | 高德地图API虽然Cursor能方便使用MCP服务,但需要Cursor Pro版本,而Pro版本价格较高,这无疑是一笔不小的开销。

对于开发者来说,我们希望开发一个Python客户端,连接高德MCP服务并通过DeepSeek调用高德MCP服务,效果如下:

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

当用户提出问题“我要去济南奥体中心看球,请查询附近3km的经济又舒适的酒店,并规划酒店到奥体中心的路线”时,可以看到DeepSeek调用了高德MCP提供的三个函数工具,最终给出推荐的酒店。

接着继续与DeepSeek对话,要求DeepSeek“提供更详细的路线”,可以看到DeepSeek在执行函数工具后,给出了两个酒店到奥体中心的详细路线及具体路径。

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

我们还可以尝试询问DeepSeek“推荐路线上的特色餐馆”,DeepSeek再次调用高德MCP提供的函数工具并给出推荐结果,针对两条不同路线的酒店,给出餐馆建议。

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

MCP服务分为两类:

  • 标准输入输出 (stdio)

stdio通过标准输入和输出流进行通信,这对本地集成和命令行工具很有用。

  • 服务器发送事件 (SSE)

SSE通过HTTP POST请求实现服务器到客户端的流式通信,可实现远程的MCP服务。

高德MCP服务借助SSE协议实现

Server-Sent Events(SSE,服务器发送事件)是基于HTTP协议的技术,允许服务器向客户端单向、实时推送数据。在SSE模式下,开发者可在客户端通过创建EventSource对象与服务器建立持久连接,服务器通过该连接持续发送数据流,无需客户端反复发送请求。

接下来将介绍该案例的实现方式。

1.注册成为高德开发者

首先注册成为高德开发者,可直接通过淘宝或支付宝账号进行创建

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

接下来,点击“应用管理”->“我的应用”,点击“创建新应用”,输入应用名称和应用类型后点击创建。

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

点击添加Key,Key名称可填写为mcp,服务平台选择Web服务,然后点击提交创建Key。

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

Key创建好后,后续可通过该Key调用MCP服务。

2.Python客户端

首先需配置MCP环境,可查看上一篇文章:MCP详解:10分钟快速入门MCP开发-
CSDN博客

Python客户端中,最重要的功能是通过SSE连接到高德MCP服务端,代码如下:

async def connect_server(self, server_config):
    async with self._lock:  # 防止并发调用 connect
        url = server_config["mcpServers"]["amap-amap-sse"]["url"]
        print(f"尝试连接到: {url}")
        self._exit_stack = AsyncExitStack()
        # 1. 进入 SSE 上下文,但不退出
        sse_cm = sse_client(url)
        # 手动调用 __aenter__ 获取流,并存储上下文管理器以便后续退出
        streams = await self._exit_stack.enter_async_context(sse_cm)
        print("SSE 流已获取。")
        # 2. 进入 Session 上下文,但不退出
        session_cm = ClientSession(streams[0], streams[1])
        # 手动调用 __aenter__ 获取 session
        self.session = await self._exit_stack.enter_async_context(session_cm)
        print("ClientSession 已创建。")
        # 3. 初始化 Session
        await self.session.initialize()
        print("Session 已初始化。")
        # 4. 获取并存储工具列表
        response = await self.session.list_tools()
        self.tools = {tool.name: tool for tool in response.tools}
        print(f"成功获取 {len(self.tools)} 个工具:")
        for name, tool in self.tools.items():
            print(f"  - {name}: {tool.description[:50]}...")  # 打印部分描述
        print("连接成功并准备就绪。")

运行代码时,可看到高德MCP提供的工具如下:

SSE 流已获取。
ClientSession 已创建。
Session 已初始化。
成功获取 12 个工具:
  - maps_direction_bicycling: 骑行路径规划用于规划骑行通勤方案,规划时会考虑天桥、单行线、封路等情况。最大支持 500km 的骑行...
  - maps_direction_driving: 驾车路径规划 API 可以根据用户起终点经纬度坐标规划以小客车、轿车通勤出行的方案,并且返回通勤方案...
  - maps_direction_transit_integrated: 根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据...
  - maps_direction_walking: 根据输入起点终点经纬度坐标规划100km 以内的步行通勤方案,并且返回通勤方案的数据...
  - maps_distance: 测量两个经纬度坐标之间的距离,支持驾车、步行以及球面距离测量...
  - maps_geo: 将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、建筑物名称解析为经纬度坐标...
  - maps_regeocode: 将一个高德经纬度坐标转换为行政区划地址信息...
  - maps_ip_location: IP 定位根据用户输入的 IP 地址,定位 IP 的所在位置...
  - maps_around_search: 周边搜,根据用户传入关键词以及坐标location,搜索出radius半径范围的POI...
  - maps_search_detail: 查询关键词搜或者周边搜获取到的POI ID的详细信息...
  - maps_text_search: 关键字搜索 API 根据用户输入的关键字进行 POI 搜索,并返回相关的信息...
  - maps_weather: 根据城市名称或者标准adcode查询指定城市的天气...

DeepSeek会根据问题分析应调用哪些工具。参数解析和工具执行的代码如下:

async def execute_tool(self, llm_response: str):
    """Process the LLM response and execute tools if needed.

    Args:
        llm_response: The response from the LLM.

    Returns:
        The result of tool execution or the original response.
    """
    import json
    try:
        pattern = r"```json\n(.*?)\n?```"
        match = re.search(pattern, llm_response, re.DOTALL)
        if match:
            llm_response = match.group(1)
        tool_call = json.loads(llm_response)
        if "tool" in tool_call and "arguments" in tool_call:
            # result = await self.session.call_tool(tool_name, tool_args)
            response = await self.session.list_tools()
            tools = response.tools
            if any(tool.name == tool_call["tool"] for tool in tools):
                try:
                    print(f"[提示]:正在调用工具 {tool_call['tool']}")
                    result = await self.session.call_tool(
                        tool_call["tool"], tool_call["arguments"]
                    )
                    if isinstance(result, dict) and "progress" in result:
                        progress = result["progress"]
                        total = result["total"]
                        percentage = (progress / total) * 100
                        print(f"Progress: {progress}/{total} ({percentage:.1f}%)")
                    # print(f"[执行结果]: {result}")
                    return f"Tool execution result: {result}"
                except Exception as e:
                    error_msg = f"Error executing tool: {str(e)}"
                    print(error_msg)
                    return error_msg
            return f"No server found with tool: {tool_call['tool']}"
        return llm_response
    except json.JSONDecodeError:
        return llm_response

接下来是聊天的循环,可观察会话处理过程:

async def chat_loop(self):
    """运行交互式聊天循环"""
    print("MCP 客户端启动")
    print("输入 /bye 退出")
    while True:
        prompt = input(">>> ").strip()
        if "/bye" in prompt.lower():
            break
        response = await self.chat(prompt)
        self.messages.append({"role": "assistant", "content": response})
        result = await self.execute_tool(response)
        while result != response:
            response = await self.chat(result, "system")
            self.messages.append(
                {"role": "assistant", "content": response}
            )
            result = await self.execute_tool(response)
        print(response)

直接运行代码即可实现DeepSeek接入高德MCP。

开发者可自行探索通过高德MCP实现更多功能。

DeepSeek对接高德地图:Python实现MCP客户端SSE连接高德服务(含完整代码)

完整代码如下:

client.py

```python
import json
import asyncio
import re
import sys
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client

from dotenv import load_dotenv
from openai import AsyncOpenAI, OpenAI

def format_tools_for_llm(tool) -> str:
"""对tool进行格式化

Returns:
    格式化之后的tool描述
"""
args_desc = []
if "properties" in tool.inputSchema:
    for param_name, param_info in tool.inputSchema["properties"].items():
        arg_desc = (
            f"- {param_name}: {param_info.get('description', 'No description')}"
        )
        if param_name in tool.inputSchema.get("required", []):
            arg_desc += " (required)"
        args_desc.append(arg_desc)

return f"Tool: {tool.name}\nDescription: {tool.description}\nArguments:\n{chr(10).join(args_desc)}"

class Client:
def init(self):
self._exit_stack: Optional[AsyncExitStack] = None
self.session: Optional[ClientSession] = None
self._lock = asyncio.Lock() # 防止并发连接/断开问题
self.is_connected = False
self.client = AsyncOpenAI(
base_url="https://api.deepseek.com",
api_key="<你的API key>",
)
self.model = "deepseek-chat"
self.messages = []

async def connect_server(self, server_config):
    async with self._lock:  # 防止并发调用 connect
        url = server_config["mcpServers"]["amap-amap-sse"]["url"]
        print(f"尝试连接到: {url}")
        self._exit_stack = AsyncExitStack()
        # 1. 进入 SSE 上下文,但不退出
        sse_cm = sse_client(url)
        # 手动调用 __aenter__ 获取流,并存储上下文管理器以便后续退出
        streams = await self._exit_stack.enter_async_context(sse_cm)
        print("SSE 流已获取。")

        # 2. 进入 Session 上下文,但不退出
        session_cm = ClientSession(streams[0], streams[1])
        # 手动调用 __aenter__ 获取 session
        self.session = await self._exit_stack.enter_async_context(session_cm)
        print("ClientSession 已创建。")

        # 3. 初始化 Session
        await self.session.initialize()
        print("Session 已初始化。")

        # 4. 获取并存储工具列表
        response = await self.session.list_tools()
        self.tools = {tool.name: tool for tool in response.tools}
        print(f"成功获取 {len(self.tools)} 个工具:")
        for name, tool in self.tools.items():
            print(f"  - {name}: {tool.description[:50]}...")  # 打印部分描述

        print("连接成功并准备就绪。")

    # 列出可用工具
    response = await self.session.list_tools()
    tools = response.tools

    tools_description = "\n".join([format_tools_for_llm(tool) for tool in tools])
    # 修改系统提示
    system_prompt = (
        "You are a helpful assistant with access to these tools:\n\n"
        f"{tools_description}\n"
        "Choose the appropriate tool based on the user's question. "
        "If no tool is needed, reply directly.\n\n"
        "IMPORTANT: When you need to use a tool, you must ONLY respond with "
        "the exact JSON object format below, nothing else:\n"
        "{\n"
        '    "tool": "tool-name",\n'
        '    "arguments": {\n'
        '        "argument-name": "value"\n'
        "    }\n"
        "}\n\n"
        '"```json" is not allowed'
        "After receiving a tool's response:\n"
        "1. Transform the raw data into a natural, conversational response\n"
        "2. Keep responses concise but informative\n"
        "3. Focus on the most relevant information\n"
        "4. Use appropriate context from the user's question\n"
        "5. Avoid simply repeating the raw data\n\n"
        "Please use only the tools that are explicitly defined above."
    )
    self.messages.append({"role": "system", "content": system_prompt})

async def disconnect(self):
    """关闭 Session 和连接。"""
    async with self._lock:
        await self._exit_stack.aclose()

async def chat(self, prompt, role="user"):
    """与LLM进行交互"""
    self.messages.append({"role": role, "content": prompt})

    # 初始化 LLM API 调用
    response = await self.client.chat.completions.create(
        model=self.model,
        messages=self.messages,
    )
    llm_response = response.choices[0].message.content
    return llm_response

async def execute_tool(self, llm_response: str):
    """Process the LLM response and execute tools if needed.

    Args:
        llm_response: The response from the LLM.

    Returns:
        The result of tool execution or the original response.
    """
    import json

    try:
        pattern = r"```json\n(.*?)\n?```"
        match = re.search(pattern, llm_response, re.DOTALL)
        if match:
            llm_response = match.group(1)
        tool_call = json.loads(llm_response)
        if "tool" in tool_call and "arguments" in tool_call:
            # result = await self.session.call_tool(tool_name, tool_args)
            response = await self.session.list_tools()
            tools = response.tools

            if any(tool.name == tool_call["tool"] for tool in tools):
                try:
                    print(f"[提示]:正在调用工具 {tool_call['tool']}")
                    result = await self.session.call_tool(
                        tool_call["tool"], tool_call["arguments"]
                    )

                    if isinstance(result, dict) and "progress" in result:
                        progress = result["progress"]
                        total = result["total"]
                        percentage = (progress / total) * 100
                        print(f"Progress: {progress}/{

相关文章

暂无评论

暂无评论...