Skip to main content
Glama
ansible_config_manager.py19.9 kB
#!/usr/bin/env python3 """ Ansible-based configuration manager for MCP servers Properly integrates with Ansible's inventory and variable resolution system This module eliminates manual YAML parsing and hostname duplication by using the official Ansible Python library. """ import logging import os from pathlib import Path from typing import Dict, List, Optional # Import Ansible inventory management try: from ansible.inventory.manager import InventoryManager from ansible.parsing.dataloader import DataLoader from ansible.vars.manager import VariableManager ANSIBLE_AVAILABLE = True except ImportError: ANSIBLE_AVAILABLE = False logger = logging.getLogger(__name__) class AnsibleConfigManager: """Manages configuration using Ansible's official inventory system.""" def __init__( self, inventory_path: Optional[str] = None, logger_obj: Optional[logging.Logger] = None, ): """Initialize the Ansible config manager.""" if logger_obj: global logger logger = logger_obj self.inventory_path = inventory_path or os.getenv("ANSIBLE_INVENTORY_PATH", "") self.loader = None self.inventory = None self.variable_manager = None self._group_cache = {} if not ANSIBLE_AVAILABLE: logger.warning("Ansible not available. Install with: pip install ansible") return if not self.inventory_path: logger.warning("No inventory path provided") return try: self._initialize_ansible() except Exception as e: logger.error(f"Failed to initialize Ansible: {e}") def _initialize_ansible(self): """Initialize Ansible loader, inventory, and variable manager.""" if not ANSIBLE_AVAILABLE or not self.inventory_path: return try: self.loader = DataLoader() self.loader.set_basedir(str(Path(self.inventory_path).parent)) self.inventory = InventoryManager( loader=self.loader, sources=self.inventory_path ) self.variable_manager = VariableManager( loader=self.loader, inventory=self.inventory ) logger.info(f"Loaded Ansible inventory from: {self.inventory_path}") logger.info(f"Hosts: {len(self.inventory.get_hosts())}, Groups: {len(self.inventory.groups)}") except Exception as e: logger.error(f"Error initializing Ansible: {e}") self.inventory = None self.variable_manager = None def is_available(self) -> bool: """Check if Ansible inventory is properly initialized.""" return ANSIBLE_AVAILABLE and self.inventory is not None def get_group_hosts( self, group_name: str, include_children: bool = True, resolve_ips: bool = True, ) -> Dict[str, str]: """ Get hosts from a group with automatic IP resolution. Args: group_name: Name of the Ansible group include_children: Include hosts from child groups resolve_ips: Resolve hostnames to IP addresses Returns: Dict of {display_name: ip_address} """ if not self.is_available(): logger.error("Ansible inventory not available") return {} cache_key = f"{group_name}:{include_children}:{resolve_ips}" if cache_key in self._group_cache: return self._group_cache[cache_key] try: result = {} if group_name not in self.inventory.groups: logger.warning(f"Group not found: {group_name}") return {} group = self.inventory.groups[group_name] hosts_to_process = set(group.get_hosts()) if include_children: for child_group in group.get_descendants(): hosts_to_process.update(child_group.get_hosts()) logger.info(f"Processing {len(hosts_to_process)} hosts from {group_name}") for host in hosts_to_process: host_vars = self.variable_manager.get_vars(host=host) display_name = host.name.split(".")[0].lower().replace("_", "-") if resolve_ips: ip = host_vars.get("ansible_host", host.name) else: ip = host.name result[display_name] = str(ip) logger.debug(f"Added {display_name} -> {ip}") self._group_cache[cache_key] = result logger.info(f"Loaded {len(result)} hosts from group {group_name}") return result except Exception as e: logger.error(f"Error getting hosts from group {group_name}: {e}") return {} def get_host_variable( self, hostname: str, variable_name: str, default: str = "" ) -> str: """Get a host variable, respecting Ansible variable precedence.""" if not self.is_available(): return default try: all_hosts = self.inventory.get_hosts() matching_host = None for host in all_hosts: if host.name == hostname or host.name.startswith(hostname + "."): matching_host = host break if not matching_host: logger.warning(f"Host not found: {hostname}") return default host_vars = self.variable_manager.get_vars(host=matching_host) value = host_vars.get(variable_name, default) logger.debug(f"Got {variable_name}={value} for host {hostname}") return str(value) if value else default except Exception as e: logger.error(f"Error getting variable {variable_name} for {hostname}: {e}") return default def get_group_variable(self, group_name: str, variable_name: str, default: str = "") -> str: """Get a group-level variable.""" if not self.is_available(): return default try: if group_name not in self.inventory.groups: logger.warning(f"Group not found: {group_name}") return default group = self.inventory.groups[group_name] group_vars = group.get_vars() value = group_vars.get(variable_name, default) logger.debug(f"Got {variable_name}={value} for group {group_name}") return str(value) if value else default except Exception as e: logger.error(f"Error getting variable {variable_name} for group {group_name}: {e}") return default def get_inventory_summary(self) -> Dict: """Get summary of inventory contents.""" if not self.is_available(): return {} try: return { "hosts": len(self.inventory.get_hosts()), "groups": len(self.inventory.groups), "group_names": sorted(list(self.inventory.groups.keys())), } except Exception as e: logger.error(f"Error getting inventory summary: {e}") return {} def get_all_hosts_with_inheritance(self) -> Dict: """ Get all hosts and groups with proper variable inheritance. Implements two-pass algorithm: 1. First pass: Collect group variables 2. Second pass: Process groups with inherited variables from parent groups Returns: Dict with structure: {"hosts": {...}, "groups": {...}} Each host includes merged variables from its groups """ if not self.is_available(): return {"hosts": {}, "groups": {}} try: hosts = {} groups = {} group_vars = {} def collect_group_vars(group_obj, parent_groups=None): """First pass: collect all group variables""" if parent_groups is None: parent_groups = [] group_name = group_obj.name current_groups = parent_groups + [group_name] # Store group vars if group_name not in group_vars: group_vars[group_name] = {} group_vars[group_name] = group_obj.get_vars().copy() # Recursively process children for child_group in group_obj.get_descendants(): if child_group.name not in group_vars: collect_group_vars(child_group, current_groups) def process_group(group_obj, parent_groups=None, inherited_vars=None): """Second pass: process groups with inherited vars""" if parent_groups is None: parent_groups = [] if inherited_vars is None: inherited_vars = {} group_name = group_obj.name current_groups = parent_groups + [group_name] # Merge inherited vars with this group's vars merged_vars = inherited_vars.copy() if group_name in group_vars: merged_vars.update(group_vars[group_name]) # Process hosts in this group for host in group_obj.get_hosts(): if host.name not in hosts: hosts[host.name] = {"groups": [], "vars": {}} # Add groups if group_name not in hosts[host.name]["groups"]: hosts[host.name]["groups"].append(group_name) # Merge vars: group vars first, then host vars override hosts[host.name]["vars"].update(merged_vars) host_vars = self.variable_manager.get_vars(host=host) hosts[host.name]["vars"].update(host_vars) # Track group membership if group_name not in groups: groups[group_name] = set() # Add hosts to group tracking for host in group_obj.get_hosts(): groups[group_name].add(host.name) # Process child groups with accumulated vars for child_group in group_obj.child_groups: process_group(child_group, current_groups, merged_vars) # Also add child group's hosts to parent group if child_group.name in groups: groups[group_name].update(groups[child_group.name]) # Start with 'all' group all_group = self.inventory.groups.get("all") if all_group: # First pass: collect group vars collect_group_vars(all_group) # Second pass: process groups process_group(all_group) # Convert group sets to lists for JSON serialization groups_list = {k: list(v) for k, v in groups.items()} logger.info( f"Loaded {len(hosts)} hosts and {len(groups_list)} groups with inheritance" ) return {"hosts": hosts, "groups": groups_list} except Exception as e: logger.error(f"Error getting all hosts with inheritance: {e}", exc_info=True) return {"hosts": {}, "groups": {}} @staticmethod def _normalize_hostname(hostname: str) -> str: """ Normalize hostname for consistent display in enums. This normalization ensures hostnames match what tools expect: - Takes short hostname (before first dot) - Converts to lowercase - Replaces underscores with hyphens Args: hostname: Full hostname (e.g., 'server_1.example.com') Returns: Normalized hostname (e.g., 'server-1') Examples: >>> AnsibleConfigManager._normalize_hostname('Server_1.example.com') 'server-1' >>> AnsibleConfigManager._normalize_hostname('web-01') 'web-01' """ return hostname.split(".")[0].lower().replace("_", "-") def _get_hosts_from_group(self, group_name: str) -> List[str]: """ Get normalized hostnames from a specific Ansible group. Args: group_name: Name of the Ansible group Returns: Sorted list of normalized hostnames """ if not self.is_available(): return [] try: if group_name not in self.inventory.groups: logger.debug(f"Group '{group_name}' not found in inventory") return [] group = self.inventory.groups[group_name] hosts = {self._normalize_hostname(host.name) for host in group.get_hosts()} return sorted(list(hosts)) except Exception as e: logger.error(f"Error getting hosts from group '{group_name}': {e}") return [] def get_docker_hosts(self) -> List[str]: """ Get hostnames with Docker or Podman API configured. Returns: Sorted list of normalized hostnames that have docker_api_port or podman_api_port defined Note: Performance: This method is lightweight and delegates to get_hosts_by_capability(), which already benefits from Ansible's internal caching. Additional caching is not needed since enum generation only happens once at MCP server startup (in list_tools()). """ # Check for either docker_api_port or podman_api_port capability docker_hosts = self.get_hosts_by_capability("docker_api_port") podman_hosts = self.get_hosts_by_capability("podman_api_port") # Combine and deduplicate all_hosts = set(docker_hosts) | set(podman_hosts) return sorted(list(all_hosts)) def get_ollama_hosts(self) -> List[str]: """ Get hostnames in the ollama_servers group. Returns: Sorted list of normalized hostnames from ollama_servers group """ return self._get_hosts_from_group("ollama_servers") def get_pihole_hosts(self) -> List[str]: """ Get hostnames with Pi-hole configured. Returns: Sorted list of normalized hostnames that have pihole_url defined """ return self.get_hosts_by_capability("pihole_url") def get_ups_hosts(self) -> List[str]: """ Get hostnames in the nut_servers group (Network UPS Tools). Returns: Sorted list of normalized hostnames from nut_servers group """ return self._get_hosts_from_group("nut_servers") def get_all_groups(self) -> List[str]: """ Get all Ansible group names. Returns: Sorted list of all group names in the inventory """ if not self.is_available(): return [] try: # Get all group names from the inventory group_names = sorted(list(self.inventory.groups.keys())) logger.debug(f"Found {len(group_names)} groups in inventory") return group_names except Exception as e: logger.error(f"Error getting all groups: {e}") return [] def get_hosts_by_capability(self, capability: str) -> List[str]: """ Generic method to get hosts with a specific variable defined. Args: capability: Variable name to check for (e.g., 'docker_api_port', 'pihole_url') Returns: Sorted list of normalized hostnames that have the specified variable """ if not self.is_available(): return [] try: hosts = set() all_hosts = self.inventory.get_hosts() for host in all_hosts: host_vars = self.variable_manager.get_vars(host=host) if host_vars.get(capability): hosts.add(self._normalize_hostname(host.name)) return sorted(list(hosts)) except Exception as e: logger.error(f"Error getting hosts by capability '{capability}': {e}") return [] def clear_cache(self): """Clear internal cache.""" self._group_cache.clear() logger.info("Cleared inventory cache") def get_group_hosts_fallback( inventory_path: str, group_name: str, logger_obj: Optional[logging.Logger] = None ) -> Dict[str, str]: """Fallback method using manual YAML parsing.""" import yaml if logger_obj: log = logger_obj.info log_warn = logger_obj.warning else: log = print log_warn = print if not Path(inventory_path).exists(): log_warn(f"Inventory file not found: {inventory_path}") return {} try: with open(inventory_path, "r") as f: inventory = yaml.safe_load(f) except Exception as e: log_warn(f"Error loading inventory: {e}") return {} def find_group(data, target_name): if isinstance(data, dict): if target_name in data: return data[target_name] for value in data.values(): if isinstance(value, dict): found = find_group(value, target_name) if found: return found return None def get_hosts_from_group(group_data, inherited_vars=None): inherited_vars = inherited_vars or {} hosts_found = [] current_vars = {**inherited_vars, **group_data.get("vars", {})} if "hosts" in group_data: for hostname, host_vars in group_data["hosts"].items(): merged_vars = {**current_vars, **(host_vars or {})} hosts_found.append((hostname, merged_vars)) if "children" in group_data: for child_name, child_data in group_data["children"].items(): if not child_data or (not child_data.get("hosts") and not child_data.get("children")): actual_child_group = find_group(inventory, child_name) if actual_child_group: hosts_found.extend(get_hosts_from_group(actual_child_group, current_vars)) else: hosts_found.extend(get_hosts_from_group(child_data, current_vars)) return hosts_found group = find_group(inventory, group_name) if not group: log_warn(f"Group not found: {group_name}") return {} hosts_list = get_hosts_from_group(group) result = {} for hostname, host_vars in hosts_list: display_name = hostname.split(".")[0].lower().replace("_", "-") ip = host_vars.get("ansible_host", hostname.split(".")[0]) result[display_name] = ip log(f"Added {display_name} -> {ip}") return result def load_group_hosts( group_name: str, inventory_path: Optional[str] = None, logger_obj: Optional[logging.Logger] = None, ) -> Dict[str, str]: """High-level function to load hosts from a group.""" if inventory_path is None: inventory_path = os.getenv("ANSIBLE_INVENTORY_PATH", "") if not inventory_path: if logger_obj: logger_obj.error("No inventory path provided") return {} if ANSIBLE_AVAILABLE: manager = AnsibleConfigManager(inventory_path, logger_obj) if manager.is_available(): return manager.get_group_hosts(group_name) if logger_obj: logger_obj.info("Falling back to manual YAML parsing") return get_group_hosts_fallback(inventory_path, group_name, logger_obj) __all__ = [ "AnsibleConfigManager", "load_group_hosts", "get_group_hosts_fallback", "ANSIBLE_AVAILABLE", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bjeans/homelab-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server