Skip to main content
Glama

MCP-Expokossodo 2025

by gfxjef
auth.py7.58 kB
""" Authentication and RBAC system """ from datetime import datetime, timedelta from typing import Optional, List from enum import Enum import jwt from fastapi import HTTPException, status, Depends, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import structlog from app.config import settings logger = structlog.get_logger() class UserRole(str, Enum): """User roles for RBAC""" LECTOR = "LECTOR" STAFF_PUERTA = "STAFF_PUERTA" COORDINADOR = "COORDINADOR" class MCPUser: """MCP User with role information""" def __init__(self, user_id: str, username: str, role: UserRole, permissions: List[str] = None): self.user_id = user_id self.username = username self.role = role self.permissions = permissions or [] # Role permissions mapping ROLE_PERMISSIONS = { UserRole.LECTOR: [ "getEventos", "getInscritos", "getAforo", "getEstadisticas", "buscarRegistro", "mapaSalaEvento" ], UserRole.STAFF_PUERTA: [ "getEventos", "getInscritos", "getAforo", "getEstadisticas", "buscarRegistro", "mapaSalaEvento", "confirmarAsistencia" ], UserRole.COORDINADOR: [ "getEventos", "getInscritos", "getAforo", "getEstadisticas", "buscarRegistro", "mapaSalaEvento", "confirmarAsistencia", "estadisticasDetalladas" ] } def create_access_token(user_id: str, username: str, role: UserRole) -> str: """Create JWT access token""" expire = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes) payload = { "sub": user_id, "username": username, "role": role.value, "exp": expire, "iat": datetime.utcnow() } token = jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm) return token def verify_token(token: str) -> Optional[MCPUser]: """Verify JWT token and return user""" try: payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) user_id: str = payload.get("sub") username: str = payload.get("username") role_str: str = payload.get("role") if user_id is None or username is None or role_str is None: return None role = UserRole(role_str) permissions = ROLE_PERMISSIONS.get(role, []) return MCPUser( user_id=user_id, username=username, role=role, permissions=permissions ) except jwt.ExpiredSignatureError: logger.warning("token_expired", token_prefix=token[:20] if token else None) return None except jwt.JWTError as e: logger.warning("jwt_error", error=str(e), token_prefix=token[:20] if token else None) return None # Security scheme security = HTTPBearer() async def get_current_user( request: Request, credentials: HTTPAuthorizationCredentials = Depends(security) ) -> MCPUser: """Get current authenticated user""" trace_id = getattr(request.state, "trace_id", "unknown") credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer", "X-Trace-ID": trace_id}, ) user = verify_token(credentials.credentials) if user is None: logger.warning( "authentication_failed", trace_id=trace_id, token_prefix=credentials.credentials[:20] if credentials.credentials else None ) raise credentials_exception logger.info( "user_authenticated", user_id=user.user_id, username=user.username, role=user.role.value, trace_id=trace_id ) return user def require_permission(required_permission: str): """Decorator to require specific permission""" def permission_checker(user: MCPUser = Depends(get_current_user)): if required_permission not in user.permissions: logger.warning( "permission_denied", user_id=user.user_id, required_permission=required_permission, user_permissions=user.permissions, user_role=user.role.value ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission '{required_permission}' required" ) return user return permission_checker def require_role(required_role: UserRole): """Decorator to require specific role""" def role_checker(user: MCPUser = Depends(get_current_user)): if user.role.value != required_role.value: logger.warning( "role_access_denied", user_id=user.user_id, required_role=required_role.value, user_role=user.role.value ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Role '{required_role.value}' required" ) return user return role_checker # Rate limiting middleware (simple in-memory implementation) from collections import defaultdict from time import time class RateLimiter: """Simple in-memory rate limiter""" def __init__(self): self.requests = defaultdict(list) def is_allowed(self, key: str, limit: int, window: int = 60) -> bool: """Check if request is allowed based on rate limit""" now = time() # Clean old requests self.requests[key] = [req_time for req_time in self.requests[key] if now - req_time < window] # Check current count if len(self.requests[key]) >= limit: return False # Add current request self.requests[key].append(now) return True # Global rate limiter instance rate_limiter = RateLimiter() def check_rate_limit(request: Request, user: MCPUser, tool_name: str): """Check rate limits based on user role and tool type""" trace_id = getattr(request.state, "trace_id", "unknown") # Define limits based on tool type write_tools = ["confirmarAsistencia"] is_write_tool = tool_name in write_tools # Set limits based on tool type if is_write_tool: limit = settings.rate_limit_writes_per_second else: limit = settings.rate_limit_reads_per_second # Create rate limit key rate_key = f"{user.user_id}:{tool_name}" # Check rate limit if not rate_limiter.is_allowed(rate_key, limit, 60): logger.warning( "rate_limit_exceeded", user_id=user.user_id, tool_name=tool_name, limit=limit, trace_id=trace_id ) raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Rate limit exceeded", headers={"X-Trace-ID": trace_id} ) logger.debug( "rate_limit_ok", user_id=user.user_id, tool_name=tool_name, limit=limit, trace_id=trace_id ) # Middleware function for authentication (not used in this implementation) async def auth_middleware(request: Request, call_next): """Optional auth middleware - currently not used""" response = await call_next(request) return response

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gfxjef/mcp_expokossodo2025'

If you have feedback or need assistance with the MCP directory API, please join our Discord server