Skip to main content
Glama

Chronos Protocol

by n0zer0d4y
server.py41.2 kB
import asyncio from datetime import datetime, timedelta from enum import Enum import json import os import uuid from typing import Sequence, Optional, Dict, Any, List from zoneinfo import ZoneInfo from tzlocal import get_localzone_name import shortuuid from shortuuid import ShortUUID from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource, ErrorData from mcp.shared.exceptions import McpError from pydantic import BaseModel, Field from .storage import resolve_data_dir, validate_and_create_data_dir def validate_timezone(timezone: str) -> str: """Validate and provide helpful error messages for timezone inputs.""" if not isinstance(timezone, str): raise ValueError("Timezone must be a string") # Handle special cases if timezone in ["system", "local"]: return timezone # Validate IANA timezone format try: ZoneInfo(timezone) return timezone except Exception: raise ValueError( f"Invalid timezone '{timezone}'. " f"Supported formats: 'system', 'local', or IANA names like " f"'America/New_York', 'Europe/London', 'Asia/Tokyo', 'UTC'" ) def validate_time_format(time_str: str) -> str: """Validate time format and provide helpful error messages.""" if not isinstance(time_str, str): raise ValueError("Time must be a string") # Check HH:MM format if not time_str or len(time_str) != 5 or time_str[2] != ':': raise ValueError( f"Invalid time format '{time_str}'. Expected 24-hour format HH:MM (00:00-23:59)" ) try: hours, minutes = time_str.split(':') hours = int(hours) minutes = int(minutes) if not (0 <= hours <= 23): raise ValueError(f"Hours must be between 00-23, got {hours:02d}") if not (0 <= minutes <= 59): raise ValueError(f"Minutes must be between 00-59, got {minutes:02d}") return time_str except ValueError as e: if "too many values" in str(e) or "not enough values" in str(e): raise ValueError( f"Invalid time format '{time_str}'. Expected HH:MM format (e.g., 14:30)" ) raise class TimeTools(str, Enum): GET_CURRENT_TIME = "get_current_time" CONVERT_TIME = "convert_time" START_ACTIVITY_LOG = "start_activity_log" END_ACTIVITY_LOG = "end_activity_log" GET_ELAPSED_TIME = "get_elapsed_time" GET_ACTIVITY_LOGS = "get_activity_logs" UPDATE_ACTIVITY_LOG = "update_activity_log" CREATE_TIME_REMINDER = "create_time_reminder" CHECK_TIME_REMINDERS = "check_time_reminders" class TaskScope(str, Enum): EPIC_PLANNING = "epic-planning" FEATURE_IMPLEMENTATION = "feature-implementation" COMPONENT_IMPLEMENTATION = "component-implementation" DEBUGGING = "debugging" INTEGRATION_TASKS = "integration-tasks" OPTIMIZATION_TASKS = "optimization-tasks" SETUP_TASKS = "setup-tasks" TESTING_TASKS = "testing-tasks" class TimeResult(BaseModel): timezone: str datetime: str formatted_timezone: str day_of_week: str is_dst: bool class TimeConversionResult(BaseModel): source: TimeResult target: TimeResult time_difference: str class ActivityLog(BaseModel): activityId: str # Changed from timeId for better naming activityType: str task_scope: TaskScope description: Optional[str] = None tags: Optional[List[str]] = None startTime: str endTime: Optional[str] = None duration: Optional[str] = None durationSeconds: Optional[int] = None result: Optional[str] = None notes: Optional[str] = None status: str # "started", "completed" class Reminder(BaseModel): reminderId: str reminderTime: str message: str relatedTaskId: Optional[str] = None status: str # "pending", "completed" createdTime: str class Database: def __init__(self, data_dir: str): self.data_dir = data_dir self.db_file = os.path.join(data_dir, "time_server_data.json") self.ensure_data_dir() self.load_data() def ensure_data_dir(self): """Ensure the data directory exists""" if not os.path.exists(self.data_dir): os.makedirs(self.data_dir) def load_data(self): """Load data from JSON file""" if os.path.exists(self.db_file): try: with open(self.db_file, 'r') as f: data = json.load(f) self.activity_logs = data.get('activityLogs', []) self.reminders = data.get('reminders', []) except (json.JSONDecodeError, IOError): # If file is corrupted, start with empty data self.activity_logs = [] self.reminders = [] else: self.activity_logs = [] self.reminders = [] def save_data(self): """Save data to JSON file""" # Ensure the data directory exists self.ensure_data_dir() data = { 'activityLogs': self.activity_logs, 'reminders': self.reminders } try: with open(self.db_file, 'w') as f: json.dump(data, f, indent=2) except IOError as e: raise McpError(ErrorData(code=500, message=f"Failed to save data: {str(e)}", data=None)) def add_activity_log(self, log: ActivityLog): """Add a new activity log""" self.activity_logs.append(log.model_dump()) self.save_data() def update_activity_log(self, activity_id: str, updates: Dict[str, Any]): """Update an existing activity log""" for log in self.activity_logs: if log.get('activityId') == activity_id: log.update(updates) self.save_data() return True return False def get_activity_log(self, activity_id: str) -> Optional[Dict[str, Any]]: """Get an activity log by activity ID""" for log in self.activity_logs: if log.get('activityId') == activity_id: return log return None def get_activity_logs(self, filters: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Get activity logs with optional filtering""" logs = self.activity_logs.copy() if filters: if 'activityType' in filters: logs = [log for log in logs if log['activityType'] == filters['activityType']] if 'task_scope' in filters: logs = [log for log in logs if log['task_scope'] == filters['task_scope']] if 'startDate' in filters: start_date = datetime.fromisoformat(filters['startDate'].replace('Z', '+00:00')) logs = [log for log in logs if datetime.fromisoformat(log['startTime'].replace('Z', '+00:00')) >= start_date] if 'endDate' in filters: end_date = datetime.fromisoformat(filters['endDate'].replace('Z', '+00:00')) logs = [log for log in logs if datetime.fromisoformat(log['startTime'].replace('Z', '+00:00')) <= end_date] # Sort by start time (newest first) logs.sort(key=lambda x: x['startTime'], reverse=True) if filters and 'limit' in filters: logs = logs[:filters['limit']] return logs def add_reminder(self, reminder: Reminder): """Add a new reminder""" self.reminders.append(reminder.model_dump()) self.save_data() def update_reminder(self, reminder_id: str, updates: Dict[str, Any]): """Update an existing reminder""" for reminder in self.reminders: if reminder['reminderId'] == reminder_id: reminder.update(updates) self.save_data() return True return False def get_reminders(self, upcoming_minutes: int = 60) -> List[Dict[str, Any]]: """Get reminders due within the specified minutes""" now = datetime.now(ZoneInfo('UTC')) cutoff_time = now + timedelta(minutes=upcoming_minutes) due_reminders = [] for reminder in self.reminders: if reminder['status'] == 'pending': reminder_time = datetime.fromisoformat(reminder['reminderTime'].replace('Z', '+00:00')) # Fix: Ensure timezone-aware comparison (handles both naive and aware datetimes) if reminder_time.tzinfo is None: reminder_time = reminder_time.replace(tzinfo=ZoneInfo('UTC')) if reminder_time <= cutoff_time: due_reminders.append(reminder) return due_reminders class IDGenerator: """Generate IDs based on format preference""" def __init__(self, format_type: str = "short", custom_length: int = 12): self.format_type = format_type if format_type == "short": self.generator = shortuuid elif format_type == "custom": self.generator = ShortUUID(alphabet="23456789ABCDEFGHJKMNPQRSTUVWXYZ") self.custom_length = custom_length elif format_type == "uuid": self.generator = uuid else: raise ValueError(f"Unsupported ID format: {format_type}") def generate_id(self) -> str: if self.format_type == "short": return self.generator.uuid() elif self.format_type == "custom": return self.generator.uuid()[:self.custom_length] elif self.format_type == "uuid": return str(self.generator.uuid4()) def get_local_tz(local_tz_override: str | None = None) -> ZoneInfo: if local_tz_override: return ZoneInfo(local_tz_override) # Get local timezone from datetime.now() local_tzname = get_localzone_name() if local_tzname is not None: return ZoneInfo(local_tzname) raise McpError(ErrorData(code=500, message="Could not determine local timezone - tzinfo is None", data=None)) def get_zoneinfo(timezone_name: str) -> ZoneInfo: try: return ZoneInfo(timezone_name) except Exception as e: raise McpError(ErrorData(code=400, message=f"Invalid timezone: {str(e)}", data=None)) def format_duration(seconds: int) -> str: """Format duration in seconds to human-readable format""" if seconds < 60: return f"{seconds}s" elif seconds < 3600: minutes = seconds // 60 remaining_seconds = seconds % 60 return f"{minutes}m {remaining_seconds}s" else: hours = seconds // 3600 remaining_minutes = (seconds % 3600) // 60 remaining_seconds = seconds % 60 return f"{hours}h {remaining_minutes}m {remaining_seconds}s" class TimeServer: def __init__(self, database: Database, id_format: str = "short"): self.db = database self.id_generator = IDGenerator(id_format) def get_current_time(self, timezone_name: str) -> TimeResult: """Get current time in specified timezone (defaults to system time)""" # Get local system time first (this is the primary time) local_time = datetime.now() local_tz_name = str(get_local_tz()) # If no specific timezone requested or if requesting system timezone, use local time if timezone_name.lower() in ["system", "local", local_tz_name.lower()]: current_time = local_time display_timezone = f"System ({local_tz_name})" actual_timezone = local_tz_name else: # Use requested timezone timezone = get_zoneinfo(timezone_name) current_time = datetime.now(timezone) display_timezone = timezone_name actual_timezone = timezone_name # Create formatted timezone display formatted_timezone = current_time.strftime("%B %d, %Y at %I:%M:%S %p") if timezone_name.lower() == "utc": formatted_timezone += " UTC" elif timezone_name.lower() in ["system", "local"]: formatted_timezone += f" (System Time - {local_tz_name})" else: formatted_timezone += f" ({timezone_name})" result = TimeResult( timezone=actual_timezone, datetime=current_time.isoformat(timespec="seconds"), formatted_timezone=formatted_timezone, day_of_week=current_time.strftime("%A"), is_dst=bool(current_time.dst()), ) return result def convert_time( self, source_tz: str, time_str: str, target_tz: str ) -> TimeConversionResult: """Convert time between timezones (defaults to system time)""" local_tz_name = str(get_local_tz()) # Handle system/local timezone references actual_source_tz = source_tz if source_tz.lower() in ["system", "local"]: actual_source_tz = local_tz_name actual_target_tz = target_tz if target_tz.lower() in ["system", "local"]: actual_target_tz = local_tz_name source_timezone = get_zoneinfo(actual_source_tz) target_timezone = get_zoneinfo(actual_target_tz) try: parsed_time = datetime.strptime(time_str, "%H:%M").time() except ValueError: raise ValueError("Invalid time format. Expected HH:MM [24-hour format]") now = datetime.now(source_timezone) source_time = datetime( now.year, now.month, now.day, parsed_time.hour, parsed_time.minute, tzinfo=source_timezone, ) target_time = source_time.astimezone(target_timezone) source_offset = source_time.utcoffset() or timedelta() target_offset = target_time.utcoffset() or timedelta() hours_difference = (target_offset - source_offset).total_seconds() / 3600 if hours_difference.is_integer(): time_diff_str = f"{hours_difference:+.1f}h" else: # For fractional hours like Nepal's UTC+5:45 time_diff_str = f"{hours_difference:+.2f}".rstrip("0").rstrip(".") + "h" # Create formatted timezone displays source_formatted = source_time.strftime("%B %d, %Y at %I:%M:%S %p") target_formatted = target_time.strftime("%B %d, %Y at %I:%M:%S %p") if source_tz.lower() in ["system", "local"]: source_formatted += f" (System Time - {local_tz_name})" display_source_tz = f"System ({local_tz_name})" else: source_formatted += f" ({source_tz})" display_source_tz = source_tz if target_tz.lower() in ["system", "local"]: target_formatted += f" (System Time - {local_tz_name})" display_target_tz = f"System ({local_tz_name})" else: target_formatted += f" ({target_tz})" display_target_tz = target_tz return TimeConversionResult( source=TimeResult( timezone=display_source_tz, datetime=source_time.isoformat(timespec="seconds"), formatted_timezone=source_formatted, day_of_week=source_time.strftime("%A"), is_dst=bool(source_time.dst()), ), target=TimeResult( timezone=display_target_tz, datetime=target_time.isoformat(timespec="seconds"), formatted_timezone=target_formatted, day_of_week=target_time.strftime("%A"), is_dst=bool(target_time.dst()), ), time_difference=time_diff_str, ) def start_activity_log( self, activity_type: str, task_scope: TaskScope, description: Optional[str] = None, tags: Optional[List[str]] = None ) -> ActivityLog: """Start a new activity log""" time_id = self.id_generator.generate_id() start_time = datetime.now(ZoneInfo('UTC')).isoformat(timespec="seconds") log = ActivityLog( activityId=time_id, # Changed from timeId to activityId activityType=activity_type, task_scope=task_scope, description=description, tags=tags, startTime=start_time, status="started" ) self.db.add_activity_log(log) return log def end_activity_log( self, activity_id: str, result: Optional[str] = None, notes: Optional[str] = None ) -> ActivityLog: """End an activity log and calculate duration""" log_data = self.db.get_activity_log(activity_id) if not log_data: raise ValueError(f"Activity log with ID {activity_id} not found") if log_data['status'] == 'completed': raise ValueError(f"Activity log with ID {activity_id} is already completed") end_time = datetime.now(ZoneInfo('UTC')).isoformat(timespec="seconds") start_time = datetime.fromisoformat(log_data['startTime'].replace('Z', '+00:00')) end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00')) duration_seconds = int((end_dt - start_time).total_seconds()) duration_str = format_duration(duration_seconds) updates = { 'endTime': end_time, 'duration': duration_str, 'durationSeconds': duration_seconds, 'result': result, 'notes': notes, 'status': 'completed' } self.db.update_activity_log(activity_id, updates) # Return updated log updated_log_data = self.db.get_activity_log(activity_id) return ActivityLog(**updated_log_data) def get_elapsed_time(self, activity_id: str) -> Dict[str, Any]: """Get elapsed time for an activity""" log_data = self.db.get_activity_log(activity_id) if not log_data: raise ValueError(f"Activity log with ID {activity_id} not found") start_time = datetime.fromisoformat(log_data['startTime'].replace('Z', '+00:00')) if log_data['status'] == 'completed': end_time = datetime.fromisoformat(log_data['endTime'].replace('Z', '+00:00')) elapsed_seconds = int((end_time - start_time).total_seconds()) return { 'activityId': activity_id, 'startTime': log_data['startTime'], 'endTime': log_data['endTime'], 'elapsedTime': format_duration(elapsed_seconds), 'elapsedSeconds': elapsed_seconds, 'status': 'completed' } else: current_time = datetime.now(ZoneInfo('UTC')) elapsed_seconds = int((current_time - start_time).total_seconds()) return { 'activityId': activity_id, 'startTime': log_data['startTime'], 'currentTime': current_time.isoformat(timespec="seconds"), 'elapsedTime': format_duration(elapsed_seconds), 'elapsedSeconds': elapsed_seconds, 'status': 'ongoing' } def update_activity_log(self, activity_id: str, updates: Dict[str, Any]) -> ActivityLog: """Update an existing activity log""" log_data = self.db.get_activity_log(activity_id) if not log_data: raise ValueError(f"Activity log with ID {activity_id} not found") # Filter out None values and convert enum values to strings filtered_updates = {} for key, value in updates.items(): if value is not None: if isinstance(value, TaskScope): filtered_updates[key] = value.value else: filtered_updates[key] = value self.db.update_activity_log(activity_id, filtered_updates) # Return updated log updated_log_data = self.db.get_activity_log(activity_id) return ActivityLog(**updated_log_data) def create_time_reminder( self, reminder_time: str, message: str, related_task_id: Optional[str] = None ) -> Reminder: """Create a time-based reminder""" reminder_id = self.id_generator.generate_id() created_time = datetime.now(ZoneInfo('UTC')).isoformat(timespec="seconds") # Validate reminder time format try: # Handle both 'Z' suffix and existing timezone info if reminder_time.endswith('Z'): parsed_time = datetime.fromisoformat(reminder_time.replace('Z', '+00:00')) else: parsed_time = datetime.fromisoformat(reminder_time) except ValueError: raise ValueError("Invalid reminder time format. Expected ISO 8601 format") reminder = Reminder( reminderId=reminder_id, reminderTime=reminder_time, message=message, relatedTaskId=related_task_id, status="pending", createdTime=created_time ) self.db.add_reminder(reminder) return reminder def check_time_reminders(self, upcoming_minutes: int = 60) -> List[Reminder]: """Check for due or upcoming time reminders""" reminders_data = self.db.get_reminders(upcoming_minutes) return [Reminder(**reminder) for reminder in reminders_data] async def serve(local_timezone: str | None = None, data_dir: str = "./chronos-data", storage_mode: str = "centralized", project_root: str = None, id_format: str = "short", timeout: int = 60) -> None: # Startup validation and logging import sys import logging # Set up basic logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("chronos-protocol") # Log startup information logger.info("🚀 Starting Chronos Protocol MCP Server") # Data directory will be logged after resolution logger.info(f"🐍 Python version: {sys.version}") logger.info(f"🌍 Local timezone override: {local_timezone or 'None (auto-detect)'}") # Resolve actual data directory based on storage configuration try: resolved_data_dir = resolve_data_dir(storage_mode, project_root, data_dir) validate_and_create_data_dir(resolved_data_dir) # Log storage configuration for diagnostics logger.info(f"🗂️ Storage mode: {storage_mode}") logger.info(f"📁 Data directory: {resolved_data_dir}") if project_root: logger.info(f"🎯 Explicit project root: {project_root}") except Exception as e: logger.error(f"❌ Storage configuration failed: {str(e)}") raise McpError(ErrorData(code=500, message=f"Storage setup failed: {str(e)}", data=None)) # Initialize server components try: server = Server("chronos-protocol") database = Database(str(resolved_data_dir)) time_server = TimeServer(database, id_format) local_tz = str(get_local_tz(local_timezone)) logger.info(f"🌍 Using timezone: {local_tz}") logger.info("✅ Server components initialized successfully") except Exception as e: logger.error(f"❌ Server initialization failed: {str(e)}") raise # Store timeout for use in tool calls operation_timeout = timeout @server.list_tools() async def list_tools() -> list[Tool]: """List available time tools.""" logger.info("📋 Listing available tools for MCP client") tools = [ Tool( name=TimeTools.GET_CURRENT_TIME.value, description="Get current time (defaults to system time, supports any timezone)", inputSchema={ "type": "object", "properties": { "timezone": { "type": "string", "description": f"Timezone to display. Use 'system' or 'local' for user's local time ({local_tz}). Use IANA names like 'America/New_York', 'Europe/London', or 'UTC' for other timezones. System time is the default and most practical choice.", } }, "required": ["timezone"], }, ), Tool( name=TimeTools.CONVERT_TIME.value, description="Convert time between timezones (defaults to system time for source/target)", inputSchema={ "type": "object", "properties": { "source_timezone": { "type": "string", "description": f"Source timezone. Use 'system' or 'local' for user's local time ({local_tz}), or IANA names like 'America/New_York', 'UTC'. System time is the most practical default.", }, "time": { "type": "string", "description": "Time to convert in 24-hour format (HH:MM)", }, "target_timezone": { "type": "string", "description": f"Target timezone. Use 'system' or 'local' for user's local time ({local_tz}), or IANA names like 'Asia/Tokyo', 'UTC'. System time is the most practical default.", }, }, "required": ["source_timezone", "time", "target_timezone"], }, ), Tool( name=TimeTools.START_ACTIVITY_LOG.value, description="Start a new activity log with system timestamp and unique Time ID", inputSchema={ "type": "object", "properties": { "activityType": { "type": "string", "description": "Type of activity being performed (e.g., 'code_review', 'debugging', 'planning')", }, "task_scope": { "type": "string", "enum": [scope.value for scope in TaskScope], "description": "Scope of the task", }, "description": { "type": "string", "description": "Detailed description of the activity", }, "tags": { "type": "array", "items": {"type": "string"}, "description": "Tags for categorizing the activity", }, }, "required": ["activityType", "task_scope"], }, ), Tool( name=TimeTools.END_ACTIVITY_LOG.value, description="End an activity log with system timestamp and calculate duration", inputSchema={ "type": "object", "properties": { "activityId": { "type": "string", "description": "Unique identifier of the activity log to end", }, "result": { "type": "string", "description": "Result or outcome of the activity", }, "notes": { "type": "string", "description": "Detailed notes for traceability and session continuity. Include: what was accomplished, key decisions made, challenges encountered, solutions implemented, and any critical context for future reference. This enables other AI agents to understand your work, backtrack steps if issues arise, and continue development effectively. Be specific about code changes, architectural decisions, and debugging insights.", }, }, "required": ["activityId"], }, ), Tool( name=TimeTools.GET_ELAPSED_TIME.value, description="Get the elapsed time for an ongoing or completed activity", inputSchema={ "type": "object", "properties": { "activityId": { "type": "string", "description": "Unique identifier of the activity log", }, }, "required": ["activityId"], }, ), Tool( name=TimeTools.GET_ACTIVITY_LOGS.value, description="Retrieve activity logs with optional filtering", inputSchema={ "type": "object", "properties": { "activityType": { "type": "string", "description": "Filter by activity type", }, "task_scope": { "type": "string", "enum": [scope.value for scope in TaskScope], "description": "Filter by task scope", }, "startDate": { "type": "string", "description": "Filter by start date (ISO 8601 format)", }, "endDate": { "type": "string", "description": "Filter by end date (ISO 8601 format)", }, "limit": { "type": "integer", "description": "Maximum number of logs to return", }, }, }, ), Tool( name=TimeTools.UPDATE_ACTIVITY_LOG.value, description="Update an existing activity log", inputSchema={ "type": "object", "properties": { "activityId": { "type": "string", "description": "Unique identifier of the activity log to update", }, "activityType": { "type": "string", "description": "Updated activity type", }, "task_scope": { "type": "string", "enum": [scope.value for scope in TaskScope], "description": "Updated task scope", }, "description": { "type": "string", "description": "Updated description", }, "tags": { "type": "array", "items": {"type": "string"}, "description": "Updated tags", }, "result": { "type": "string", "description": "Updated result", }, "notes": { "type": "string", "description": "Updated traceability notes for session continuity and auditability. Document progress, changes in approach, new findings, or corrections made. Include specific details about what was modified, why changes were needed, and any insights gained. This ensures other AI agents can follow your thought process, understand context, and continue work seamlessly without losing critical information.", }, }, "required": ["activityId"], }, ), Tool( name=TimeTools.CREATE_TIME_REMINDER.value, description="Create a time-based reminder using system time for scheduling", inputSchema={ "type": "object", "properties": { "reminderTime": { "type": "string", "description": "Time for the reminder (ISO 8601 format with explicit timezone offset, e.g., '2025-09-11T14:00:00+08:00' for local time or '2025-09-11T14:00:00+00:00' for UTC)", }, "message": { "type": "string", "description": "Reminder message", }, "relatedTaskId": { "type": "string", "description": "ID of related task or activity", }, }, "required": ["reminderTime", "message"], }, ), Tool( name=TimeTools.CHECK_TIME_REMINDERS.value, description="Check for due or upcoming time reminders", inputSchema={ "type": "object", "properties": { "upcomingMinutes": { "type": "integer", "description": "Check for reminders due within this many minutes (default: 60)", }, }, }, ), ] logger.info(f"✅ Providing {len(tools)} tools to MCP client") return tools async def _execute_tool(name: str, arguments: dict) -> Any: """Execute tool logic.""" try: match name: case TimeTools.GET_CURRENT_TIME.value: timezone = arguments.get("timezone") if not timezone: raise ValueError("Missing required argument: timezone") # Validate timezone with helpful error messages validated_timezone = validate_timezone(timezone) result = time_server.get_current_time(validated_timezone) case TimeTools.CONVERT_TIME.value: if not all( k in arguments for k in ["source_timezone", "time", "target_timezone"] ): raise ValueError("Missing required arguments") # Validate inputs with helpful error messages source_tz = validate_timezone(arguments["source_timezone"]) target_tz = validate_timezone(arguments["target_timezone"]) validated_time = validate_time_format(arguments["time"]) result = time_server.convert_time(source_tz, validated_time, target_tz) case TimeTools.START_ACTIVITY_LOG.value: if not all(k in arguments for k in ["activityType", "task_scope"]): raise ValueError("Missing required arguments: activityType and task_scope") result = time_server.start_activity_log( arguments["activityType"], TaskScope(arguments["task_scope"]), arguments.get("description"), arguments.get("tags"), ) case TimeTools.END_ACTIVITY_LOG.value: activity_id = arguments.get("activityId") if not activity_id: raise ValueError("Missing required argument: activityId") result = time_server.end_activity_log( activity_id, arguments.get("result"), arguments.get("notes"), ) case TimeTools.GET_ELAPSED_TIME.value: activity_id = arguments.get("activityId") if not activity_id: raise ValueError("Missing required argument: activityId") result = time_server.get_elapsed_time(activity_id) case TimeTools.GET_ACTIVITY_LOGS.value: filters = {} for key in ["activityType", "task_scope", "startDate", "endDate", "limit"]: if key in arguments: filters[key] = arguments[key] result = time_server.db.get_activity_logs(filters) case TimeTools.UPDATE_ACTIVITY_LOG.value: activity_id = arguments.get("activityId") if not activity_id: raise ValueError("Missing required argument: activityId") updates = {} for key in ["activityType", "task_scope", "description", "tags", "result", "notes"]: if key in arguments: updates[key] = arguments[key] result = time_server.update_activity_log(activity_id, updates) case TimeTools.CREATE_TIME_REMINDER.value: if not all(k in arguments for k in ["reminderTime", "message"]): raise ValueError("Missing required arguments: reminderTime and message") result = time_server.create_time_reminder( arguments["reminderTime"], arguments["message"], arguments.get("relatedTaskId"), ) case TimeTools.CHECK_TIME_REMINDERS.value: upcoming_minutes = arguments.get("upcomingMinutes", 60) result = time_server.check_time_reminders(upcoming_minutes) case _: raise ValueError(f"Unknown tool: {name}") logger.info(f"✅ Tool {name} executed successfully") # Handle different result types for JSON serialization if isinstance(result, list): if result and hasattr(result[0], 'model_dump'): # Handle non-empty list of Pydantic models serialized_result = [item.model_dump() for item in result] else: # Handle empty list or list of non-Pydantic objects serialized_result = result elif hasattr(result, 'model_dump'): # Handle single Pydantic model serialized_result = result.model_dump() else: # Handle other types (dict, str, int, etc.) serialized_result = result return [ TextContent(type="text", text=json.dumps(serialized_result, indent=2)) ] except Exception as e: logger.error(f"❌ Tool {name} failed: {str(e)}") raise McpError(ErrorData(code=500, message=f"Error processing chronos-protocol query: {str(e)}", data=None)) @server.call_tool() async def call_tool( name: str, arguments: dict ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Handle tool calls for time queries and activity logging.""" logger.info(f"🔧 Tool called: {name} (timeout: {operation_timeout}s)") try: # Execute tool with timeout result = await asyncio.wait_for( _execute_tool(name, arguments), timeout=operation_timeout ) return result except asyncio.TimeoutError: logger.error(f"❌ Tool {name} timed out after {operation_timeout} seconds") raise McpError(ErrorData(code=408, message=f"Tool {name} timed out after {operation_timeout} seconds", data=None)) except Exception as e: logger.error(f"❌ Tool {name} failed: {str(e)}") raise McpError(ErrorData(code=500, message=f"Error processing chronos-protocol query: {str(e)}", data=None)) options = server.create_initialization_options() logger.info("🔌 Starting MCP server with stdio transport") logger.info("📡 Ready to receive MCP client connections") try: async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, options) except Exception as e: logger.error(f"❌ Server error: {str(e)}") raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/n0zer0d4y/chronos-protocol'

If you have feedback or need assistance with the MCP directory API, please join our Discord server