Skip to main content
Glama
qdrant-remove-paths-from-context-headers.py10.7 kB
#!/usr/bin/env -S uv run python """ Remove legacy file path header lines from pre-v11.0.0 chunks. - Iterates all points in the Qdrant collection. - Selects points with payload.version < v11.0.0 (robust semantic compare). - If the first line of the chunk starts with '# file://', remove that line. - If not present, warn and skip that point. - Shows a preview and asks for confirmation before updating. """ import sys import logging from typing import List, Tuple, Optional, Dict, Any import asyncio import re # Set up minimal logging to avoid spam logging.basicConfig(level=logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("qdrant_client").setLevel(logging.WARNING) from archive_agent.core.ContextManager import ContextManager from archive_agent.db.QdrantSchema import parse_payload from qdrant_client.models import Filter from qdrant_client.http.exceptions import UnexpectedResponse class Colors: """ANSI color codes for terminal output.""" GREEN = '\033[92m' RED = '\033[91m' BLUE = '\033[94m' YELLOW = '\033[93m' BOLD = '\033[1m' END = '\033[0m' def colored_text(text: str, color: str) -> str: """Return colored text for terminal output.""" return f"{color}{text}{Colors.END}" def get_user_input(prompt: str, valid_responses: Optional[List[str]] = None) -> str: """Get user input with optional validation.""" while True: response = input(prompt).strip().lower() if valid_responses is None or response in valid_responses: return response print(f"Invalid response. Please enter one of: {', '.join(valid_responses)}") def semver_tuple(v: str) -> Tuple[int, int, int]: """Convert 'vMAJOR.MINOR.PATCH' or 'MAJOR.MINOR.PATCH' to a tuple of ints.""" if not isinstance(v, str) or not v: return 0, 0, 0 v = v.strip() if v.startswith("v") or v.startswith("V"): v = v[1:] parts = re.split(r"[.\-+]", v) nums: List[int] = [] for p in parts[:3]: # noinspection PyBroadException try: nums.append(int(p)) except Exception: nums.append(0) while len(nums) < 3: nums.append(0) return tuple(nums[:3]) # type: ignore[return-value] def is_version_lt(version: Optional[str], ref: str = "v11.0.0") -> bool: """Return True if version < ref with semantic comparison.""" return semver_tuple(version or "0.0.0") < semver_tuple(ref) def resolve_text_field(payload_dict: Dict[str, Any]) -> Optional[str]: """Find the text-bearing field in the payload dict.""" candidates = [ "cpntext", # legacy? "chunk_text", # common "text", # fallback "content", # fallback "body", # fallback ] for k in candidates: if k in payload_dict and isinstance(payload_dict[k], str): return k # Try to guess: first str field with multiple lines for k, v in payload_dict.items(): if isinstance(v, str) and ("\n" in v or v.startswith("# ")): return k return None def has_file_header(first_line: str) -> bool: """Check whether the first line is a 'file://' header comment.""" return first_line.strip().lower().startswith("# file://") def step1_collect_candidates(qdrant) -> Tuple[List, List[str], List[str]]: """Step 1: Scan all points and collect candidates to modify and to skip.""" print(colored_text("Step 1: Scan All Points", Colors.BOLD)) print("Reading entire collection (this may take a moment)...") try: scroll_result = asyncio.run( qdrant.qdrant.scroll( collection_name=qdrant.collection, scroll_filter=Filter(must=[]), # Get all points limit=1_000_000_000, # Large limit to get all points with_payload=True, ) ) except UnexpectedResponse as e: print(f"Error querying Qdrant: {e}") sys.exit(1) except Exception as e: print(f"Error during point retrieval: {e}") sys.exit(1) all_points = scroll_result[0] print(f"Total points in collection: {colored_text(str(len(all_points)), Colors.YELLOW)}") candidates = [] skip_reasons: List[str] = [] errors: List[str] = [] for point in all_points: try: payload_model = parse_payload(point.payload) payload = payload_model.model_dump() version = payload.get("version") or payload.get("schema_version") or payload.get("meta_version") # Only pre-v11.0.0 are candidates if not is_version_lt(version, "v11.0.0"): continue text_field = resolve_text_field(payload) if not text_field: skip_reasons.append(f"point {point.id}: no recognizable text field") continue text_value = payload[text_field] lines = text_value.splitlines() if not lines: skip_reasons.append(f"point {point.id}: empty text") continue if has_file_header(lines[0]): candidates.append((point, text_field, text_value)) else: skip_reasons.append(f"point {point.id}: first line is not a file header; will skip") except Exception as e: errors.append(f"point {getattr(point, 'id', '?')}: {e}") continue print(f"\nPre-v11.0.0 points with removable header: {colored_text(str(len(candidates)), Colors.GREEN)}") print(f"Pre-v11.0.0 points without header (skipped): {colored_text(str(len(skip_reasons)), Colors.YELLOW)}") if errors: print(f"Points with parse errors: {colored_text(str(len(errors)), Colors.RED)}") return candidates, skip_reasons, errors def step2_preview(candidates: List[Tuple[Any, str, str]], skip_reasons: List[str], errors: List[str]) -> str: """Step 2: Show a preview of changes and gather confirmation.""" print(f"\n{colored_text('Step 2: Preview Changes', Colors.BOLD)}") show_n = min(10, len(candidates)) if show_n == 0: print("No eligible points found. Nothing to change.") return get_user_input("\nAbort? (abort): ", ["abort", "a"]) print(f"Showing first {show_n} diffs:") for i in range(show_n): point, text_field, text_value = candidates[i] lines = text_value.splitlines() old_first = lines[0] if lines else "" new_text = "\n".join(lines[1:]) if len(lines) > 1 else "" print(f"\n {i+1:2d}. point {point.id} field '{text_field}'") print(f" - old first line: {colored_text(old_first, Colors.RED)}") preview_new_first = new_text.splitlines()[0] if new_text else "<empty after removal>" print(f" + new first line: {colored_text(preview_new_first, Colors.GREEN)}") print("\nSummary:") print(f" Will modify: {colored_text(str(len(candidates)), Colors.GREEN)} points") print(f" Will skip : {colored_text(str(len(skip_reasons)), Colors.YELLOW)} points (no header on first line)") if errors: print(f" Errors : {colored_text(str(len(errors)), Colors.RED)} points (parse failures)") if skip_reasons[:5]: print("\nExamples of skipped reasons (up to 5):") for s in skip_reasons[:5]: print(f" - {s}") if errors[:3]: print("\nExamples of errors (up to 3):") for e in errors[:3]: print(f" - {e}") print(f"\n{colored_text('Step 3: Final Confirmation', Colors.BOLD)}") return get_user_input("Proceed with removing header lines? (continue/retry/abort): ", ["continue", "c", "retry", "r", "abort", "a"]) def step3_apply(qdrant, candidates: List[Tuple[Any, str, str]]): """Step 3: Apply the changes to Qdrant.""" print(f"\n{colored_text('Step 4: Applying Changes', Colors.BOLD)}") updated = 0 try: for idx, (point, text_field, text_value) in enumerate(candidates, 1): lines = text_value.splitlines() # Safety: re-check header presence right before write if not lines or not has_file_header(lines[0]): # This should be rare; skip if it changed between preview and apply print(f" Skipping point {point.id}: header no longer present.") continue new_text = "\n".join(lines[1:]) # Prepare updated payload payload_model = parse_payload(point.payload) updated_payload = payload_model.model_dump() updated_payload[text_field] = new_text asyncio.run( qdrant.qdrant.set_payload( collection_name=qdrant.collection, payload=updated_payload, points=[point.id], ) ) updated += 1 if idx % 100 == 0: print(f" Updated {updated}/{len(candidates)} points...") print(f"\n{colored_text('✅ Success!', Colors.GREEN)}") print(f"Updated {updated} points (removed leading '# file://…' line).") except Exception as e: print(f"\n{colored_text('❌ Error while updating:', Colors.RED)} {e}") sys.exit(1) def main(): """Main entry point for the header removal tool.""" print(colored_text("🔧 Qdrant Pre-v11 Header Cleanup", Colors.BOLD)) print("This tool removes leading '# file://…' lines from pre-v11.0.0 chunks.\n") # Initialize context manager (uses current profile) try: print("Connecting to Archive Agent...") context = ContextManager(verbose=False) qdrant = context.qdrant print(f"Connected to profile: {colored_text(context.profile_manager.get_profile_name(), Colors.BLUE)}") print(f"Collection: {colored_text(qdrant.collection, Colors.BLUE)}\n") except Exception as e: print(f"Error connecting to Archive Agent: {e}") print("Make sure Qdrant is running and your profile is configured.") sys.exit(1) # Step 1: Collect candidates candidates, skip_reasons, errors = step1_collect_candidates(qdrant) # Step 2: Preview & confirm (with retry option, which simply rescans) while True: decision = step2_preview(candidates, skip_reasons, errors) if decision in ["abort", "a"]: print("Aborting without making changes.") sys.exit(0) if decision in ["continue", "c"]: break # retry path: rescan candidates, skip_reasons, errors = step1_collect_candidates(qdrant) # Step 3: Apply step3_apply(qdrant, candidates) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shredEngineer/Archive-Agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server