Skip to main content
Glama

Rosbridge MCP Server

list_topics.py1.82 kB
""" List Topics Tool Lists all available ROS topics. """ import asyncio from typing import Any import roslibpy from mcp.types import Tool LIST_TOPICS_TOOL = Tool( name="list_topics", description="List all available ROS topics", inputSchema={ "type": "object", "properties": {}, "required": [], }, ) async def list_topics(ros: roslibpy.Ros) -> dict[str, Any]: """ List all available ROS topics. Args: ros: The ROS connection instance Returns: A dictionary containing the list of topics or error information """ try: # Create a promise to handle the async rosbridge call future = asyncio.Future() def handle_topics(msg): future.set_result(msg) # Request topics list from rosbridge ros.get_topics(handle_topics) # Wait for the response with timeout result = await asyncio.wait_for(future, timeout=5.0) # Format the response topics_list = [] if 'topics' in result: for i, topic in enumerate(result['topics']): topic_info = { 'name': topic, 'type': result.get('types', [])[i] if i < len(result.get('types', [])) else 'unknown' } topics_list.append(topic_info) return { 'success': True, 'topics': topics_list, 'count': len(topics_list) } except asyncio.TimeoutError: return { 'success': False, 'error': 'Timeout waiting for topics list from rosbridge' } except Exception as e: return { 'success': False, 'error': f'Failed to list topics: {str(e)}' }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TakanariShimbo/rosbridge-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server