main.py•4.52 kB
import asyncio
from typing import Optional
from mcp import ClientSession, StdioServerParameters
from dotenv import load_dotenv
from contextlib import AsyncExitStack
from openai import OpenAI
import os
from mcp.client.stdio import stdio_client
import sys
import json
load_dotenv()
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.openai = OpenAI(
api_key=os.getenv("API-KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
async def connect_to_server(self, server_script_path: str):
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
messages = [
{"role": "user", "content": query},
]
response = await self.session.list_tools()
available_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
for tool in response.tools
]
response = self.openai.chat.completions.create(
model="qwen-turbo", messages=messages, tools=available_tools
)
final_text = []
assistant_message_content = []
msg = response.choices[0].message
while True:
if len(msg.content) != 0:
assistant_message_content.append(msg.content)
final_text.append(msg.content)
if msg.tool_calls is None:
break
if msg.tool_calls is not None:
for tc in msg.tool_calls:
tool_name = tc.function.name
tool_args = json.loads(tc.function.arguments)
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(
f"[Calling tool {tool_name} with args {tool_args}]"
)
messages.append(msg)
messages.append(
{
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tc.id,
}
)
response = self.openai.chat.completions.create(
model="qwen-turbo", messages=messages, tools=available_tools
)
msg = response.choices[0].message
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == "quit":
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())