Skip to main content
Glama
BloodHound-MCP.py25.2 kB
from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv from neo4j import GraphDatabase import os import logging # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # BloodHound & Neo4j connection details BLOODHOUND_URI = os.getenv("BLOODHOUND_URI", "bolt://localhost:7687") BLOODHOUND_USERNAME = os.getenv("BLOODHOUND_USERNAME", "neo4j") BLOODHOUND_PASSWORD = os.getenv("BLOODHOUND_PASSWORD", "bloodhound") logger.debug(f"Using Neo4j connection details:") logger.debug(f"URI: {BLOODHOUND_URI}") logger.debug(f"User: {BLOODHOUND_USERNAME}") # Create Neo4j driver with BloodHound CE specific settings driver = GraphDatabase.driver( BLOODHOUND_URI, auth=(BLOODHOUND_USERNAME, BLOODHOUND_PASSWORD), encrypted=False ) # Verify connection def verify_connectivity(): try: # Try both default and bloodhound databases databases = ["neo4j", "bloodhound"] for db in databases: try: with driver.session(database=db) as session: logger.debug(f"Attempting to verify connection to database '{db}'...") result = session.run("MATCH (n:User) RETURN count(n) as count") count = result.single()["count"] logger.info(f"Successfully connected to database '{db}'. Found {count} users.") return True except Exception as e: logger.debug(f"Failed to connect to database '{db}': {str(e)}") continue raise Exception("Could not connect to any database") except Exception as e: logger.error(f"Failed to connect to Neo4j: {str(e)}") return False # Create FastMCP server for BloodHound mcp = FastMCP("BH-Examples") @mcp.tool() async def query_bloodhound(query: str): databases = ["neo4j", "bloodhound"] last_error = None for db in databases: try: with driver.session(database=db) as session: result = session.run(query) data = [record.data() for record in result] logger.info(f"Query successful on database '{db}'") return {"success": True, "data": data} except Exception as e: last_error = e logger.debug(f"Query failed on database '{db}': {str(e)}") continue logger.error(f"Query failed on all databases. Last error: {str(last_error)}") return {"success": False, "error": str(last_error)} # Domain Information @mcp.tool() async def find_all_domain_admins(): query = """ MATCH p = (t:Group)<-[:MemberOf*1..]-(a) WHERE (a:User or a:Computer) and t.objectid ENDS WITH '-512' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def map_domain_trusts(): query = """ MATCH p = (:Domain)-[:TrustedBy]->(:Domain) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_tier_zero_locations(): query = """ MATCH p = (t:Base)<-[:Contains*1..]-(:Domain) WHERE t.highvalue = true RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def map_ou_structure(): query = """ MATCH p = (:Domain)-[:Contains*1..]->(:OU) RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Dangerous Privileges @mcp.tool() async def find_dcsync_privileges(): query = """ MATCH p=(:Base)-[:DCSync|AllExtendedRights|GenericAll]->(:Domain) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_foreign_group_memberships(): query = """ MATCH p=(s:Base)-[:MemberOf]->(t:Group) WHERE s.domainsid<>t.domainsid RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_local_admins(): query = """ MATCH p=(s:Group)-[:AdminTo]->(:Computer) WHERE s.objectid ENDS WITH '-513' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_laps_readers(): query = """ MATCH p=(s:Group)-[:AllExtendedRights|ReadLAPSPassword]->(:Computer) WHERE s.objectid ENDS WITH '-513' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_high_value_paths(): query = """ MATCH p=shortestPath((s:Group)-[r*1..]->(t)) WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_workstation_rdp(): query = """ MATCH p=(s:Group)-[:CanRDP]->(t:Computer) WHERE s.objectid ENDS WITH '-513' AND NOT toUpper(t.operatingsystem) CONTAINS 'SERVER' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_server_rdp(): query = """ MATCH p=(s:Group)-[:CanRDP]->(t:Computer) WHERE s.objectid ENDS WITH '-513' AND toUpper(t.operatingsystem) CONTAINS 'SERVER' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_privileges(): query = """ MATCH p=(s:Group)-[r]->(:Base) WHERE s.objectid ENDS WITH '-513' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_admin_non_dc_logons(): query = """ MATCH (s)-[:MemberOf*0..]->(g:Group) WHERE g.objectid ENDS WITH '-516' WITH COLLECT(s) AS exclude MATCH p = (c:Computer)-[:HasSession]->(:User)-[:MemberOf*1..]->(g:Group) WHERE g.objectid ENDS WITH '-512' AND NOT c IN exclude RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Kerberos Interaction @mcp.tool() async def find_kerberoastable_tier_zero(): query = """ MATCH (u:User) WHERE u.hasspn=true AND u.enabled = true AND NOT u.objectid ENDS WITH '-502' AND NOT u.gmsa = true AND NOT u.msa = true AND u.highvalue = true RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_all_kerberoastable_users(): query = """ MATCH (u:User) WHERE u.hasspn=true AND u.enabled = true AND NOT u.objectid ENDS WITH '-502' AND NOT u.gmsa = true AND NOT u.msa = true RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_kerberoastable_most_admin(): query = """ MATCH (u:User) WHERE u.hasspn = true AND u.enabled = true AND NOT u.objectid ENDS WITH '-502' AND NOT u.gmsa = true AND NOT u.msa = true MATCH (u)-[:MemberOf|AdminTo*1..]->(c:Computer) WITH DISTINCT u, COUNT(c) AS adminCount RETURN u ORDER BY adminCount DESC LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_asreproast_users(): query = """ MATCH (u:User) WHERE u.dontreqpreauth = true AND u.enabled = true RETURN u LIMIT 100 """ return await query_bloodhound(query) # Shortest Paths @mcp.tool() async def find_shortest_paths_unconstrained_delegation(): query = """ MATCH p=shortestPath((s)-[r*1..]->(t:Computer)) WHERE t.unconstraineddelegation = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_kerberoastable_to_da(): query = """ MATCH p=shortestPath((s:User)-[r*1..]->(t:Group)) WHERE s.hasspn=true AND s.enabled = true AND NOT s.objectid ENDS WITH '-502' AND NOT s.gmsa = true AND NOT s.msa = true AND t.objectid ENDS WITH '-512' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_shortest_paths_to_tier_zero(): query = """ MATCH p=shortestPath((s)-[r*1..]->(t)) WHERE t.highvalue = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_domain_users_to_tier_zero(): query = """ MATCH p=shortestPath((s:Group)-[r*1..]->(t)) WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_shortest_paths_to_domain_admins(): query = """ MATCH p=shortestPath((t:Group)<-[r*1..]-(s:Base)) WHERE t.objectid ENDS WITH '-512' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_owned_objects(): query = """ MATCH p=shortestPath((s:Base)-[r*1..]->(t:Base)) WHERE s.owned = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Active Directory Certificate Services @mcp.tool() async def find_pki_hierarchy(): query = """ MATCH p=()-[:HostsCAService|IssuedSignedBy|EnterpriseCAFor|RootCAFor|TrustedForNTAuth|NTAuthStoreFor*..]->(:Domain) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_public_key_services(): query = """ MATCH p = (c:Container)-[:Contains*..]->(:Base) WHERE c.distinguishedname starts with 'CN=PUBLIC KEY SERVICES,CN=SERVICES,CN=CONFIGURATION,DC=' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_certificate_enrollment_rights(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_esc1_vulnerable_templates(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE ct.enrolleesuppliessubject = True AND ct.authenticationenabled = True AND ct.requiresmanagerapproval = False AND (ct.authorizedsignatures = 0 OR ct.schemaversion = 1) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_esc2_vulnerable_templates(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(c:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE c.requiresmanagerapproval = false AND (c.effectiveekus = [''] OR '2.5.29.37.0' IN c.effectiveekus) AND (c.authorizedsignatures = 0 OR c.schemaversion = 1) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_enrollment_agent_templates(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE '1.3.6.1.4.1.311.20.2.1' IN ct.effectiveekus OR '2.5.29.37.0' IN ct.effectiveekus OR SIZE(ct.effectiveekus) = 0 RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_dcs_weak_certificate_binding(): query = """ MATCH p = (s:Computer)-[:DCFor]->(:Domain) WHERE s.strongcertificatebindingenforcementraw = 0 OR s.strongcertificatebindingenforcementraw = 1 RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_inactive_tier_zero_principals(): query = """ WITH 60 as inactive_days MATCH (n:Base) WHERE n.highvalue = true AND n.enabled = true AND n.lastlogontimestamp < (datetime().epochseconds - (inactive_days * 86400)) AND n.lastlogon < (datetime().epochseconds - (inactive_days * 86400)) AND n.whencreated < (datetime().epochseconds - (inactive_days * 86400)) AND NOT n.name STARTS WITH 'AZUREADKERBEROS.' AND NOT n.objectid ENDS WITH '-500' AND NOT n.name STARTS WITH 'AZUREADSSOACC.' RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_tier_zero_without_smartcard(): query = """ MATCH (u:User) WHERE u.highvalue = true AND u.enabled = true AND u.smartcardrequired = false AND NOT u.name STARTS WITH 'MSOL_' AND NOT u.name STARTS WITH 'PROVAGENTGMSA' AND NOT u.name STARTS WITH 'ADSYNCMSA_' RETURN u """ return await query_bloodhound(query) @mcp.tool() async def find_domains_with_machine_quota(): query = """ MATCH (d:Domain) WHERE d.machineaccountquota > 0 RETURN d """ return await query_bloodhound(query) @mcp.tool() async def find_smartcard_dont_expire_domains(): query = """ MATCH (s:Domain)-[:Contains*1..]->(t:Base) WHERE s.expirepasswordsonsmartcardonlyaccounts = false AND t.enabled = true AND t.smartcardrequired = true RETURN s """ return await query_bloodhound(query) @mcp.tool() async def find_two_way_forest_trust_delegation(): query = """ MATCH p=(n:Domain)-[r:TrustedBy]->(m:Domain) WHERE (m)-[:TrustedBy]->(n) AND r.trusttype = 'Forest' AND r.tgtdelegationenabled = true RETURN p """ return await query_bloodhound(query) @mcp.tool() async def find_unsupported_operating_systems(): query = """ MATCH (c:Computer) WHERE c.operatingsystem =~ '(?i).*Windows.* (2000|2003|2008|2012|xp|vista|7|8|me|nt).*' RETURN c LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_users_with_no_password_required(): query = """ MATCH (u:User) WHERE u.passwordnotreqd = true RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_users_password_not_rotated(): query = """ WITH 365 as days_since_change MATCH (u:User) WHERE u.pwdlastset < (datetime().epochseconds - (days_since_change * 86400)) AND NOT u.pwdlastset IN [-1.0, 0.0] RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_nested_tier_zero_groups(): query = """ MATCH p=(t:Group)<-[:MemberOf*..]-(s:Group) WHERE t.highvalue = true AND NOT s.objectid ENDS WITH '-512' AND NOT s.objectid ENDS WITH '-519' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_disabled_tier_zero_principals(): query = """ MATCH (n:Base) WHERE n.highvalue = true AND n.enabled = false AND NOT n.objectid ENDS WITH '-502' AND NOT n.objectid ENDS WITH '-500' RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_principals_reversible_encryption(): query = """ MATCH (n:Base) WHERE n.encryptedtextpwdallowed = true RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_principals_des_only_kerberos(): query = """ MATCH (n:Base) WHERE n.enabled = true AND n.usedeskeyonly = true RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_principals_weak_kerberos_encryption(): query = """ MATCH (u:Base) WHERE 'DES-CBC-CRC' IN u.supportedencryptiontypes OR 'DES-CBC-MD5' IN u.supportedencryptiontypes OR 'RC4-HMAC-MD5' IN u.supportedencryptiontypes RETURN u """ return await query_bloodhound(query) @mcp.tool() async def find_tier_zero_non_expiring_passwords(): query = """ MATCH (u:User) WHERE u.enabled = true AND u.pwdneverexpires = true AND u.highvalue = true RETURN u LIMIT 100 """ return await query_bloodhound(query) # NTLM Relay Attacks @mcp.tool() async def find_ntlm_relay_edges(): query = """ MATCH p = (n:Base)-[:CoerceAndRelayNTLMToLDAP|CoerceAndRelayNTLMToLDAPS|CoerceAndRelayNTLMToADCS|CoerceAndRelayNTLMToSMB]->(:Base) RETURN p LIMIT 500 """ return await query_bloodhound(query) @mcp.tool() async def find_esc8_vulnerable_cas(): query = """ MATCH (n:EnterpriseCA) WHERE n.hasvulnerableendpoint=true RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_computers_outbound_ntlm_deny(): query = """ MATCH (c:Computer) WHERE c.restrictoutboundntlm = True RETURN c LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_computers_in_protected_users(): query = """ MATCH p = (:Base)-[:MemberOf*1..]->(g:Group) WHERE g.objectid ENDS WITH "-525" RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_dcs_vulnerable_ntlm_relay(): query = """ MATCH p = (dc:Computer)-[:DCFor]->(:Domain) WHERE (dc.ldapavailable = True AND dc.ldapsigning = False) OR (dc.ldapsavailable = True AND dc.ldapsepa = False) OR (dc.ldapavailable = True AND dc.ldapsavailable = True AND dc.ldapsigning = False and dc.ldapsepa = True) RETURN p """ return await query_bloodhound(query) @mcp.tool() async def find_computers_webclient_running(): query = """ MATCH (c:Computer) WHERE c.webclientrunning = True RETURN c LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_computers_no_smb_signing(): query = """ MATCH (n:Computer) WHERE n.smbsigning = False RETURN n """ return await query_bloodhound(query) # Azure - General @mcp.tool() async def find_global_administrators(): query = """ MATCH p = (:AZBase)-[:AZGlobalAdmin*1..]->(:AZTenant) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_high_privileged_role_members(): query = """ MATCH p=(t:AZRole)<-[:AZHasRole|AZMemberOf*1..2]-(:AZBase) WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Azure - Shortest Paths @mcp.tool() async def find_paths_from_entra_to_tier_zero(): query = """ MATCH p=shortestPath((s:AZUser)-[r*1..]->(t:AZBase)) WHERE t.highvalue = true AND t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_to_privileged_roles(): query = """ MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZRole)) WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_azure_apps_to_tier_zero(): query = """ MATCH p=shortestPath((s:AZApp)-[r*1..]->(t:AZBase)) WHERE t.highvalue = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_to_azure_subscriptions(): query = """ MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZSubscription)) WHERE s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Azure - Microsoft Graph @mcp.tool("sp_app_role_grant") async def find_service_principals_with_app_role_grant(): query = """ MATCH p=(:AZServicePrincipal)-[:AZMGGrantAppRoles]->(:AZTenant) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("find_sp_graph_assignments") async def find_service_principals_with_graph_assignments(): query = """ MATCH p=(:AZServicePrincipal)-[:AZMGAppRoleAssignment_ReadWrite_All|AZMGApplication_ReadWrite_All|AZMGDirectory_ReadWrite_All|AZMGGroupMember_ReadWrite_All|AZMGGroup_ReadWrite_All|AZMGRoleManagement_ReadWrite_Directory|AZMGServicePrincipalEndpoint_ReadWrite_All]->(:AZServicePrincipal) RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Azure - Hygiene @mcp.tool() async def find_foreign_tier_zero_principals(): query = """ MATCH (n:AZServicePrincipal) WHERE n.highvalue = true AND NOT toUpper(n.appownerorganizationid) = toUpper(n.tenantid) AND n.appownerorganizationid CONTAINS '-' RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_synced_tier_zero_principals(): query = """ MATCH (ENTRA:AZBase) MATCH (AD:Base) WHERE ENTRA.onpremsyncenabled = true AND ENTRA.onpremid = AD.objectid AND AD.highvalue = true RETURN ENTRA LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_external_tier_zero_users(): query = """ MATCH (n:AZUser) WHERE n.highvalue = true AND n.name CONTAINS '#EXT#@' RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_disabled_azure_tier_zero_principals(): query = """ MATCH (n:AZBase) WHERE n.highvalue = true AND n.enabled = false RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_devices_unsupported_os(): query = """ MATCH (n:AZDevice) WHERE n.operatingsystem CONTAINS 'WINDOWS' AND n.operatingsystemversion =~ '(10.0.19044|10.0.22000|10.0.19043|10.0.19042|10.0.19041|10.0.18363|10.0.18362|10.0.17763|10.0.17134|10.0.16299|10.0.15063|10.0.14393|10.0.10586|10.0.10240|6.3.9600|6.2.9200|6.1.7601|6.0.6200|5.1.2600|6.0.6003|5.2.3790|5.0.2195).?.*' RETURN n LIMIT 100 """ return await query_bloodhound(query) # Azure - Cross Platform Attack Paths @mcp.tool() async def find_entra_users_in_domain_admins(): query = """ MATCH p = (:AZUser)-[:SyncedToADUser]->(:User)-[:MemberOf]->(t:Group) WHERE t.objectid ENDS WITH '-512' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_onprem_users_owning_entra_objects(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwns]->(:AZBase) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_onprem_users_in_entra_groups(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("templates_no_security_ext") async def find_templates_no_security_extension(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE ct.nosecurityextension = true RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("templates_with_user_san") async def find_templates_with_user_specified_san(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(eca:EnterpriseCA) WHERE eca.isuserspecifiessanenabled = True RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_ca_administrators(): query = """ MATCH p = (:Base)-[:ManageCertificates|ManageCA]->(:EnterpriseCA) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_direct_entra_roles") async def find_onprem_users_with_direct_entra_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZHasRole]->(:AZRole) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_group_entra_roles") async def find_onprem_users_with_group_entra_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZHasRole]->(:AZRole) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_direct_azure_roles") async def find_onprem_users_with_direct_azure_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_group_azure_roles") async def find_onprem_users_with_group_azure_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase) RETURN p LIMIT 1000 """ return await query_bloodhound(query) if __name__ == "__main__": if verify_connectivity(): try: logger.info("Starting MCP server...") mcp.run(transport="stdio") finally: driver.close() else: logger.error("Failed to establish Neo4j connection. Please check your credentials and connection settings.")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MorDavid/BloodHound-MCP-AI'

If you have feedback or need assistance with the MCP directory API, please join our Discord server