Skip to main content
Glama

BuildAutomata Memory MCP Server

by brucepro
intentions.pyβ€’13.9 kB
""" Intention management for BuildAutomata Memory System Copyright 2025 Jurden Bruce """ import json import logging import uuid from typing import List, Dict, Any, Optional from datetime import datetime logger = logging.getLogger("buildautomata-memory.intentions") try: from .models import Intention except ImportError: from models import Intention class IntentionManager: """Handles intention storage and retrieval (Agency Bridge Pattern)""" def __init__(self, db_conn, db_lock, error_log, get_working_set_func=None): self.db_conn = db_conn self.db_lock = db_lock self.error_log = error_log self.get_working_set_func = get_working_set_func def _log_error(self, operation: str, error: Exception): """Log detailed error information""" import traceback error_entry = { "timestamp": datetime.now().isoformat(), "operation": operation, "error_type": type(error).__name__, "error_msg": str(error), "traceback": traceback.format_exc(), } self.error_log.append(error_entry) if len(self.error_log) > 100: self.error_log = self.error_log[-100:] async def store_intention( self, description: str, priority: float = 0.5, deadline: Optional[datetime] = None, preconditions: Optional[List[str]] = None, actions: Optional[List[str]] = None, related_memories: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Store a new intention""" if not self.db_conn: return {"error": "Database not available"} intention = Intention( id=str(uuid.uuid4()), description=description, priority=max(0.0, min(1.0, priority)), status="pending", created_at=datetime.now(), updated_at=datetime.now(), deadline=deadline, preconditions=preconditions or [], actions=actions or [], related_memories=related_memories or [], metadata=metadata or {}, ) try: with self.db_lock: self.db_conn.execute(""" INSERT INTO intentions ( id, description, priority, status, created_at, updated_at, deadline, preconditions, actions, related_memories, metadata, last_checked, check_count ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( intention.id, intention.description, intention.priority, intention.status, intention.created_at, intention.updated_at, intention.deadline, json.dumps(intention.preconditions), json.dumps(intention.actions), json.dumps(intention.related_memories), json.dumps(intention.metadata), intention.last_checked, intention.check_count, )) self.db_conn.commit() logger.info(f"Stored intention {intention.id}: {description[:50]}...") return { "success": True, "intention_id": intention.id, "priority": intention.priority, "status": intention.status, } except Exception as e: logger.error(f"Failed to store intention: {e}") self._log_error("store_intention", e) return {"error": str(e)} async def get_active_intentions( self, limit: int = 10, include_pending: bool = True, ) -> List[Dict[str, Any]]: """Get active intentions sorted by priority""" if not self.db_conn: return [] try: with self.db_lock: statuses = ["active"] if include_pending: statuses.append("pending") placeholders = ','.join('?' * len(statuses)) cursor = self.db_conn.execute(f""" SELECT * FROM intentions WHERE status IN ({placeholders}) ORDER BY priority DESC, deadline ASC LIMIT ? """, (*statuses, limit)) intentions = [] for row in cursor.fetchall(): intention_dict = dict(row) intention_dict['preconditions'] = json.loads(row['preconditions'] or '[]') intention_dict['actions'] = json.loads(row['actions'] or '[]') intention_dict['related_memories'] = json.loads(row['related_memories'] or '[]') intention_dict['metadata'] = json.loads(row['metadata'] or '{}') intentions.append(intention_dict) return intentions except Exception as e: logger.error(f"Failed to get active intentions: {e}") self._log_error("get_active_intentions", e) return [] async def update_intention_status( self, intention_id: str, status: str, metadata_updates: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Update intention status""" if not self.db_conn: return {"error": "Database not available"} if status not in ["pending", "active", "completed", "cancelled"]: return {"error": f"Invalid status: {status}"} try: with self.db_lock: cursor = self.db_conn.execute( "SELECT * FROM intentions WHERE id = ?", (intention_id,) ) row = cursor.fetchone() if not row: return {"error": f"Intention not found: {intention_id}"} metadata = json.loads(row['metadata'] or '{}') if metadata_updates: metadata.update(metadata_updates) self.db_conn.execute(""" UPDATE intentions SET status = ?, updated_at = ?, metadata = ? WHERE id = ? """, (status, datetime.now(), json.dumps(metadata), intention_id)) self.db_conn.commit() logger.info(f"Updated intention {intention_id} to status: {status}") return {"success": True, "intention_id": intention_id, "status": status} except Exception as e: logger.error(f"Failed to update intention status: {e}") self._log_error("update_intention_status", e) return {"error": str(e)} async def check_intention(self, intention_id: str) -> Dict[str, Any]: """Mark intention as checked""" if not self.db_conn: return {"error": "Database not available"} try: with self.db_lock: self.db_conn.execute(""" UPDATE intentions SET last_checked = ?, check_count = check_count + 1 WHERE id = ? """, (datetime.now(), intention_id)) self.db_conn.commit() return {"success": True} except Exception as e: logger.error(f"Failed to check intention: {e}") return {"error": str(e)} async def proactive_initialization_scan(self) -> Dict[str, Any]: """Proactive scan on startup""" scan_start = datetime.now() scan_results = { "timestamp": scan_start.isoformat(), "continuity_check": {}, "active_intentions": [], "urgent_items": [], "context_summary": {}, } if self.db_conn is None: logger.warning("Proactive scan skipped: SQLite not initialized") scan_results["error"] = "SQLite not initialized" return scan_results try: # Continuity check - prefer command_history for accuracy, fall back to memories with self.db_lock: last_activity = None # Try command_history first (more accurate - tracks actual MCP calls) try: cursor = self.db_conn.execute( "SELECT timestamp FROM command_history ORDER BY timestamp DESC LIMIT 1" ) row = cursor.fetchone() if row and row['timestamp']: last_activity = row['timestamp'] except Exception: pass # Table might not exist yet # Fall back to memories if no command history if not last_activity: cursor = self.db_conn.execute("SELECT MAX(updated_at) as last_activity FROM memories") row = cursor.fetchone() if row and row['last_activity']: last_activity = row['last_activity'] if last_activity: if isinstance(last_activity, str): last_activity = datetime.fromisoformat(last_activity) now = datetime.now(last_activity.tzinfo) if last_activity.tzinfo else datetime.now() time_gap = now - last_activity scan_results["continuity_check"] = { "last_activity": last_activity.isoformat(), "time_gap_hours": time_gap.total_seconds() / 3600, "is_new_session": time_gap.total_seconds() > 3600, } # Active intentions active_intentions = await self.get_active_intentions(limit=5) scan_results["active_intentions"] = active_intentions # Urgent items urgent = [] for intention in active_intentions: if intention.get('deadline'): deadline = intention['deadline'] if isinstance(deadline, str): deadline = datetime.fromisoformat(deadline) now = datetime.now(deadline.tzinfo) if deadline.tzinfo else datetime.now() if deadline < now: urgent.append({ "type": "overdue_intention", "id": intention['id'], "description": intention['description'], "deadline": intention['deadline'], }) elif intention.get('priority', 0) >= 0.9: urgent.append({ "type": "high_priority_intention", "id": intention['id'], "description": intention['description'], "priority": intention['priority'], }) scan_results["urgent_items"] = urgent # Recent context with self.db_lock: cursor = self.db_conn.execute(""" SELECT id, content, category, created_at FROM memories ORDER BY created_at DESC LIMIT 5 """) recent_memories = [] for row in cursor.fetchall(): recent_memories.append({ "id": row['id'], "content": row['content'], "category": row['category'], "created_at": row['created_at'], }) scan_results["context_summary"] = { "recent_memories": recent_memories, "active_intention_count": len(active_intentions), "urgent_count": len(urgent), } # 5. Working Set - optimal context memories for confabulation prevention # Load top 40 memories by working set score (importance + failure patterns + recency) if self.get_working_set_func: working_set = await self.get_working_set_func(target_size=40) # Format working set with FULL content - these are important enough to load completely working_set_formatted = [] for mem in working_set[:40]: # Limit to 40 for token budget working_set_formatted.append({ "id": mem['id'], "category": mem['category'], "importance": mem.get('importance', 0), "content": mem['content'], # Full content, not preview "created_at": mem.get('created_at', ''), "working_set_score": mem.get('working_set_score', 0), # Include score for transparency }) scan_results["working_set"] = { "size": len(working_set), "memories": working_set_formatted, "description": "Top memories by importance, failure patterns, and recency for confabulation prevention" } else: scan_results["working_set"] = { "size": 0, "memories": [], "description": "Working set not available (get_working_set_func not provided)" } scan_duration = (datetime.now() - scan_start).total_seconds() * 1000 scan_results["scan_duration_ms"] = scan_duration logger.info(f"Proactive scan completed in {scan_duration:.1f}ms with {scan_results['working_set']['size']} working set memories") return scan_results except Exception as e: logger.error(f"Proactive scan failed: {e}") self._log_error("proactive_scan", e) return {"error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brucepro/buildautomata_memory_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server