Skip to main content
Glama

BuildAutomata Memory MCP Server

by brucepro
timeline.pyβ€’11.9 kB
""" Timeline analysis for BuildAutomata Memory System Copyright 2025 Jurden Bruce """ import asyncio import json import logging import re import difflib from typing import List, Dict, Any, Optional from datetime import datetime logger = logging.getLogger("buildautomata-memory.timeline") class TimelineAnalysis: """Handles temporal analysis and timeline generation""" def __init__(self, db_conn, db_lock): self.db_conn = db_conn self.db_lock = db_lock def compute_text_diff(self, old_text: str, new_text: str) -> Dict[str, Any]: """Compute text difference between versions""" if old_text == new_text: return {"changed": False} old_lines = old_text.splitlines(keepends=True) new_lines = new_text.splitlines(keepends=True) diff = list(difflib.unified_diff(old_lines, new_lines, lineterm='', n=0)) similarity = difflib.SequenceMatcher(None, old_text, new_text).ratio() additions = [line[1:] for line in diff if line.startswith('+') and not line.startswith('+++')] deletions = [line[1:] for line in diff if line.startswith('-') and not line.startswith('---')] return { "changed": True, "similarity": round(similarity, 3), "additions": additions[:5], "deletions": deletions[:5], "total_additions": len(additions), "total_deletions": len(deletions), "change_magnitude": round(1 - similarity, 3) } def extract_memory_references(self, content: str, all_memory_ids: set) -> List[str]: """Extract references to other memories from content""" uuid_pattern = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' found_ids = re.findall(uuid_pattern, content, re.IGNORECASE) return list(set(found_id for found_id in found_ids if found_id in all_memory_ids)) def detect_temporal_patterns(self, events: List[Dict]) -> Dict[str, Any]: """Analyze temporal patterns in memory timeline""" if not events: return {} timestamps = [] for event in events: try: timestamps.append(datetime.fromisoformat(event['timestamp'])) except: continue if not timestamps: return {} timestamps.sort() # Burst detection bursts = [] current_burst = {"start": timestamps[0], "end": timestamps[0], "count": 1, "events": [events[0]]} for i in range(1, len(timestamps)): time_gap = (timestamps[i] - timestamps[i-1]).total_seconds() / 3600 if time_gap <= 4: current_burst["end"] = timestamps[i] current_burst["count"] += 1 current_burst["events"].append(events[i]) else: if current_burst["count"] >= 3: bursts.append({ "start": current_burst["start"].isoformat(), "end": current_burst["end"].isoformat(), "duration_hours": round((current_burst["end"] - current_burst["start"]).total_seconds() / 3600, 1), "event_count": current_burst["count"], "intensity": round(current_burst["count"] / max(1, (current_burst["end"] - current_burst["start"]).total_seconds() / 3600), 2) }) current_burst = {"start": timestamps[i], "end": timestamps[i], "count": 1, "events": [events[i]]} if current_burst["count"] >= 3: bursts.append({ "start": current_burst["start"].isoformat(), "end": current_burst["end"].isoformat(), "duration_hours": round((current_burst["end"] - current_burst["start"]).total_seconds() / 3600, 1), "event_count": current_burst["count"], "intensity": round(current_burst["count"] / max(1, (current_burst["end"] - current_burst["start"]).total_seconds() / 3600), 2) }) # Gap detection gaps = [] for i in range(1, len(timestamps)): gap_hours = (timestamps[i] - timestamps[i-1]).total_seconds() / 3600 if gap_hours > 24: gaps.append({ "start": timestamps[i-1].isoformat(), "end": timestamps[i].isoformat(), "duration_hours": round(gap_hours, 1), "duration_days": round(gap_hours / 24, 1) }) total_duration = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 return { "total_events": len(events), "first_event": timestamps[0].isoformat(), "last_event": timestamps[-1].isoformat(), "total_duration_hours": round(total_duration, 1), "total_duration_days": round(total_duration / 24, 1), "bursts": bursts, "gaps": gaps, "avg_events_per_day": round(len(events) / max(1, total_duration / 24), 2) if total_duration > 0 else 0 } def get_memory_versions_detailed( self, mem_id: str, all_memory_ids: set, include_diffs: bool ) -> List[Dict[str, Any]]: """Get detailed version history with diffs""" if not self.db_conn: return [] try: with self.db_lock: cursor = self.db_conn.execute(""" SELECT curr.version_id, curr.version_number, curr.content, curr.category, curr.importance, curr.tags, curr.metadata, curr.change_type, curr.change_description, curr.created_at, curr.content_hash, curr.prev_version_id, prev.content as prev_content, prev.category as prev_category, prev.importance as prev_importance, prev.tags as prev_tags FROM memory_versions curr LEFT JOIN memory_versions prev ON curr.prev_version_id = prev.version_id WHERE curr.memory_id = ? ORDER BY curr.version_number ASC """, (mem_id,)) versions = cursor.fetchall() if not versions: return [] events = [] prev_content_for_diff = None for row in versions: event = { "memory_id": mem_id, "version": row["version_number"], "timestamp": row["created_at"], "change_type": row["change_type"], "change_description": row["change_description"], "content": row["content"], "category": row["category"], "importance": row["importance"], "tags": json.loads(row["tags"]) if row["tags"] else [], "content_hash": row["content_hash"][:8], } if include_diffs and prev_content_for_diff: diff_info = self.compute_text_diff(prev_content_for_diff, row["content"]) if diff_info.get("changed"): event["diff"] = diff_info if row["prev_version_id"] and row["prev_category"]: field_changes = [] if row["prev_category"] != row["category"]: field_changes.append(f"category: {row['prev_category']} β†’ {row['category']}") if row["prev_importance"] != row["importance"]: field_changes.append(f"importance: {row['prev_importance']} β†’ {row['importance']}") prev_tags = set(json.loads(row["prev_tags"]) if row["prev_tags"] else []) curr_tags = set(json.loads(row["tags"]) if row["tags"] else []) if prev_tags != curr_tags: added_tags = curr_tags - prev_tags removed_tags = prev_tags - curr_tags if added_tags: field_changes.append(f"tags added: {', '.join(added_tags)}") if removed_tags: field_changes.append(f"tags removed: {', '.join(removed_tags)}") if field_changes: event["field_changes"] = field_changes references = self.extract_memory_references(row["content"], all_memory_ids) if references: event["references"] = references event["references_count"] = len(references) events.append(event) prev_content_for_diff = row["content"] return events except Exception as e: logger.error(f"Error getting detailed versions for {mem_id}: {e}") return [] def build_relationship_graph(self, events: List[Dict]) -> Dict[str, Any]: """Build graph showing how memories reference each other""" reference_map = {} referenced_by_map = {} for event in events: mem_id = event["memory_id"] refs = event.get("references", []) if mem_id not in reference_map: reference_map[mem_id] = set() for ref in refs: reference_map[mem_id].add(ref) if ref not in referenced_by_map: referenced_by_map[ref] = set() referenced_by_map[ref].add(mem_id) return { "references": {k: list(v) for k, v in reference_map.items() if v}, "referenced_by": {k: list(v) for k, v in referenced_by_map.items() if v}, "total_cross_references": sum(len(v) for v in reference_map.values()) } def generate_narrative_summary(self, events: List[Dict], patterns: Dict) -> str: """Generate narrative summary of timeline""" if not events: return "No events in timeline." summary_parts = [] first_event = events[0] last_event = events[-1] summary_parts.append( f"Memory journey from {first_event['timestamp']} to {last_event['timestamp']}." ) if patterns.get("total_duration_days"): summary_parts.append( f"Spanning {patterns['total_duration_days']} days with {len(events)} total memory events." ) bursts = patterns.get("bursts", []) if bursts: summary_parts.append(f"Identified {len(bursts)} burst(s) of intensive activity:") for i, burst in enumerate(bursts[:3], 1): summary_parts.append( f" - Burst {i}: {burst['event_count']} events in {burst['duration_hours']}h " f"(intensity: {burst['intensity']} events/hour) from {burst['start']}" ) gaps = patterns.get("gaps", []) if gaps: summary_parts.append(f"Detected {len(gaps)} significant gap(s) in memory activity:") for i, gap in enumerate(gaps[:3], 1): summary_parts.append( f" - Gap {i}: {gap['duration_days']} days of silence from {gap['start']} to {gap['end']}" ) categories = {} for event in events: cat = event.get("category", "unknown") categories[cat] = categories.get(cat, 0) + 1 if categories: top_cats = sorted(categories.items(), key=lambda x: x[1], reverse=True)[:3] summary_parts.append( f"Primary categories: {', '.join(f'{cat} ({count})' for cat, count in top_cats)}" ) return "\n".join(summary_parts)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brucepro/buildautomata_memory_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server