Skip to main content
Glama
oauth_metrics.py11.8 kB
""" Enhanced metrics collection for OAuth multi-user load testing. Extends the base BenchmarkMetrics to track per-user statistics, workflow completion rates, and cross-user operation latencies. """ import statistics from collections import Counter, defaultdict from typing import Any from tests.load.oauth_workloads import WorkflowResult class OAuthBenchmarkMetrics: """ Enhanced metrics for OAuth multi-user load testing. Tracks: - Per-user operation counts and latencies - Workflow completion rates and timings - Cross-user operation metrics - Step-by-step workflow breakdowns """ def __init__(self): # Base metrics self.start_time: float | None = None self.end_time: float | None = None # Per-user tracking self.user_operations: dict[str, list[dict[str, Any]]] = defaultdict(list) self.user_operation_counts: dict[str, Counter] = defaultdict(Counter) self.user_errors: dict[str, Counter] = defaultdict(Counter) # Workflow tracking self.workflows: list[WorkflowResult] = [] self.workflow_counts: Counter = Counter() self.workflow_successes: Counter = Counter() self.workflow_durations: dict[str, list[float]] = defaultdict(list) # Baseline operations (non-workflow) self.baseline_operations: list[dict[str, Any]] = [] def start(self): """Mark the start of the benchmark.""" import time self.start_time = time.time() def stop(self): """Mark the end of the benchmark.""" import time self.end_time = time.time() @property def duration(self) -> float: """Total benchmark duration in seconds.""" if self.start_time is None or self.end_time is None: return 0.0 return self.end_time - self.start_time def add_workflow_result(self, result: WorkflowResult): """ Add a workflow execution result. Args: result: WorkflowResult from workflow execution """ self.workflows.append(result) self.workflow_counts[result.workflow_name] += 1 if result.success: self.workflow_successes[result.workflow_name] += 1 self.workflow_durations[result.workflow_name].append(result.total_duration) # Track per-user operations from workflow steps for step in result.steps: self.user_operation_counts[step.user][step.step_name] += 1 if not step.success: self.user_errors[step.user][step.step_name] += 1 self.user_operations[step.user].append( { "type": "workflow_step", "workflow": result.workflow_name, "step": step.step_name, "success": step.success, "duration": step.duration, "error": step.error, } ) def add_baseline_operation(self, operation: dict[str, Any]): """ Add a baseline (non-workflow) operation result. Args: operation: Dict with keys: type, operation, user, success, duration, error (optional) """ self.baseline_operations.append(operation) user = operation.get("user", "unknown") op_name = operation.get("operation", "unknown") success = operation.get("success", False) self.user_operation_counts[user][op_name] += 1 if not success: self.user_errors[user][op_name] += 1 self.user_operations[user].append(operation) def get_user_stats(self) -> dict[str, dict[str, Any]]: """ Get per-user statistics. Returns: Dict mapping username to their stats """ stats = {} for user, operations in self.user_operations.items(): total_ops = len(operations) successful_ops = sum(1 for op in operations if op.get("success", False)) durations = [op["duration"] for op in operations if "duration" in op] stats[user] = { "total_operations": total_ops, "successful_operations": successful_ops, "failed_operations": total_ops - successful_ops, "success_rate": (successful_ops / total_ops * 100) if total_ops > 0 else 0.0, "latency": self._calculate_latency_stats(durations), "operations_breakdown": dict(self.user_operation_counts[user]), "errors_breakdown": dict(self.user_errors[user]), } return stats def get_workflow_stats(self) -> dict[str, dict[str, Any]]: """ Get workflow execution statistics. Returns: Dict mapping workflow name to its stats """ stats = {} for workflow_name in self.workflow_counts: total = self.workflow_counts[workflow_name] successes = self.workflow_successes[workflow_name] durations = self.workflow_durations[workflow_name] # Calculate per-step latencies step_latencies = defaultdict(list) for workflow in self.workflows: if workflow.workflow_name == workflow_name: for step in workflow.steps: if step.success: step_latencies[step.step_name].append(step.duration) step_stats = {} for step_name, latencies in step_latencies.items(): if latencies: step_stats[step_name] = self._calculate_latency_stats(latencies) stats[workflow_name] = { "total_executions": total, "successful_executions": successes, "failed_executions": total - successes, "success_rate": (successes / total * 100) if total > 0 else 0.0, "latency": self._calculate_latency_stats(durations), "step_latencies": step_stats, } return stats def get_baseline_stats(self) -> dict[str, Any]: """ Get statistics for baseline operations. Returns: Dict with baseline operation stats """ if not self.baseline_operations: return { "total_operations": 0, "success_rate": 0.0, "latency": self._calculate_latency_stats([]), } total = len(self.baseline_operations) successes = sum( 1 for op in self.baseline_operations if op.get("success", False) ) durations = [ op["duration"] for op in self.baseline_operations if "duration" in op ] # Per-operation breakdown operation_counts = Counter() operation_errors = Counter() for op in self.baseline_operations: op_name = op.get("operation", "unknown") operation_counts[op_name] += 1 if not op.get("success", False): operation_errors[op_name] += 1 return { "total_operations": total, "successful_operations": successes, "failed_operations": total - successes, "success_rate": (successes / total * 100) if total > 0 else 0.0, "latency": self._calculate_latency_stats(durations), "operations_breakdown": dict(operation_counts), "errors_breakdown": dict(operation_errors), } def _calculate_latency_stats(self, durations: list[float]) -> dict[str, float]: """Calculate latency statistics from a list of durations.""" if not durations: return { "min": 0.0, "max": 0.0, "mean": 0.0, "median": 0.0, "p90": 0.0, "p95": 0.0, "p99": 0.0, } sorted_durations = sorted(durations) def percentile(data: list[float], p: float) -> float: k = (len(data) - 1) * p f = int(k) c = f + 1 if c >= len(data): return data[-1] return data[f] + (k - f) * (data[c] - data[f]) return { "min": min(durations), "max": max(durations), "mean": statistics.mean(durations), "median": statistics.median(durations), "p90": percentile(sorted_durations, 0.90), "p95": percentile(sorted_durations, 0.95), "p99": percentile(sorted_durations, 0.99), } def to_dict(self) -> dict[str, Any]: """Convert metrics to dictionary for JSON export.""" return { "summary": { "duration": self.duration, "total_workflows": len(self.workflows), "total_baseline_ops": len(self.baseline_operations), "total_users": len(self.user_operations), }, "workflows": self.get_workflow_stats(), "baseline": self.get_baseline_stats(), "users": self.get_user_stats(), } def print_report(self): """Print human-readable benchmark report.""" print("\n" + "=" * 80) print("OAUTH MULTI-USER BENCHMARK RESULTS") print("=" * 80) # Summary print(f"\nDuration: {self.duration:.2f}s") print(f"Total Users: {len(self.user_operations)}") print(f"Total Workflows Executed: {len(self.workflows)}") print(f"Total Baseline Operations: {len(self.baseline_operations)}") # Workflow Stats if self.workflows: print("\n" + "-" * 80) print("WORKFLOW STATISTICS") print("-" * 80) print( f"{'Workflow':<30} {'Total':>8} {'Success':>8} {'Rate':>8} {'P50':>10} {'P95':>10}" ) print("-" * 80) workflow_stats = self.get_workflow_stats() for name, stats in sorted(workflow_stats.items()): latency = stats["latency"] print( f"{name:<30} {stats['total_executions']:>8} " f"{stats['successful_executions']:>8} " f"{stats['success_rate']:>7.1f}% " f"{latency['median']:>9.4f}s {latency['p95']:>9.4f}s" ) # Per-User Stats print("\n" + "-" * 80) print("PER-USER STATISTICS") print("-" * 80) print( f"{'User':<20} {'Total Ops':>10} {'Success':>10} {'Errors':>8} {'Rate':>8} {'P50':>10}" ) print("-" * 80) user_stats = self.get_user_stats() for username, stats in sorted(user_stats.items()): latency = stats["latency"] print( f"{username:<20} {stats['total_operations']:>10} " f"{stats['successful_operations']:>10} " f"{stats['failed_operations']:>8} " f"{stats['success_rate']:>7.1f}% " f"{latency['median']:>9.4f}s" ) # Baseline Stats if self.baseline_operations: print("\n" + "-" * 80) print("BASELINE OPERATIONS") print("-" * 80) baseline = self.get_baseline_stats() print(f"Total Operations: {baseline['total_operations']}") print(f"Success Rate: {baseline['success_rate']:.1f}%") latency = baseline["latency"] print( f"Latency: min={latency['min']:.4f}s, p50={latency['median']:.4f}s, " f"p95={latency['p95']:.4f}s, max={latency['max']:.4f}s" ) print("=" * 80 + "\n")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/No-Smoke/nextcloud-mcp-comprehensive'

If you have feedback or need assistance with the MCP directory API, please join our Discord server