Skip to main content
Glama

Chromium ARM64 Browser

by nfodor
Code-Examples.md29.2 kB
# 💻 Code Examples: Ready-to-Use Agent Farm Implementations #code #examples #templates #ready-to-use ## 🚀 Complete Agent Farm Setup ### 1. Farm Manager Class ```python # farm_manager.py import asyncio import aiohttp import json from typing import List, Dict, Any import logging class AgentFarm: """Complete Pi Agent Farm Management""" def __init__(self, agents: List[str]): self.agents = agents self.current_agent = 0 self.session = None self.logger = logging.getLogger(__name__) async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def get_next_agent(self) -> str: """Round-robin agent selection""" agent = self.agents[self.current_agent] self.current_agent = (self.current_agent + 1) % len(self.agents) return agent async def execute_task(self, task: Dict[str, Any], retries: int = 3) -> Dict[str, Any]: """Execute task with automatic failover""" last_error = None for attempt in range(retries): agent = self.get_next_agent() try: async with self.session.post( f"http://{agent}:3000/browse", json=task, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: result = await response.json() self.logger.info(f"Task completed on {agent}") return result else: self.logger.warning(f"Agent {agent} returned {response.status}") except Exception as e: last_error = e self.logger.error(f"Agent {agent} failed: {e}") raise Exception(f"All agents failed. Last error: {last_error}") async def parallel_execute(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Execute multiple tasks in parallel""" semaphore = asyncio.Semaphore(len(self.agents)) async def bounded_task(task): async with semaphore: return await self.execute_task(task) results = await asyncio.gather( *[bounded_task(task) for task in tasks], return_exceptions=True ) return results async def health_check(self) -> Dict[str, bool]: """Check health of all agents""" health_status = {} async def check_agent(agent): try: async with self.session.get( f"http://{agent}:3000/health", timeout=aiohttp.ClientTimeout(total=5) ) as response: return agent, response.status == 200 except: return agent, False results = await asyncio.gather( *[check_agent(agent) for agent in self.agents], return_exceptions=True ) for result in results: if isinstance(result, tuple): agent, status = result health_status[agent] = status else: # Exception occurred health_status['unknown'] = False return health_status # Usage example async def main(): agents = [ "pi-agent-01.local", "pi-agent-02.local", "pi-agent-03.local" ] async with AgentFarm(agents) as farm: # Health check health = await farm.health_check() print(f"Agent health: {health}") # Single task result = await farm.execute_task({ "action": "navigate", "url": "https://example.com" }) print(f"Navigation result: {result}") # Parallel tasks tasks = [ {"action": "navigate", "url": f"https://example.com/page{i}"} for i in range(1, 11) ] results = await farm.parallel_execute(tasks) print(f"Processed {len(results)} tasks") if __name__ == "__main__": asyncio.run(main()) ``` ### 2. Express.js Agent Server ```javascript // agent-server.js const express = require('express'); const { exec } = require('child_process'); const path = require('path'); const fs = require('fs').promises; const crypto = require('crypto'); class AgentServer { constructor() { this.app = express(); this.setupMiddleware(); this.setupRoutes(); this.browserPath = path.join(__dirname, 'arm64_browser.py'); this.activeTasks = new Map(); } setupMiddleware() { this.app.use(express.json({ limit: '10mb' })); this.app.use((req, res, next) => { req.taskId = crypto.randomUUID(); console.log(`[${req.taskId}] ${req.method} ${req.path}`); next(); }); } setupRoutes() { // Health check this.app.get('/health', (req, res) => { res.json({ status: 'healthy', node: process.env.HOSTNAME || 'pi-agent', activeTasks: this.activeTasks.size, uptime: process.uptime(), memory: process.memoryUsage() }); }); // Main browsing endpoint this.app.post('/browse', async (req, res) => { try { const result = await this.executeBrowserTask(req.taskId, req.body); res.json({ success: true, result, taskId: req.taskId }); } catch (error) { console.error(`[${req.taskId}] Error:`, error.message); res.status(500).json({ success: false, error: error.message, taskId: req.taskId }); } }); // Batch processing this.app.post('/batch', async (req, res) => { const { tasks } = req.body; const results = []; for (const task of tasks) { try { const result = await this.executeBrowserTask(req.taskId, task); results.push({ success: true, result }); } catch (error) { results.push({ success: false, error: error.message }); } } res.json({ results, taskId: req.taskId }); }); // Task status this.app.get('/task/:taskId', (req, res) => { const task = this.activeTasks.get(req.params.taskId); if (!task) { res.status(404).json({ error: 'Task not found' }); } else { res.json(task); } }); } async executeBrowserTask(taskId, taskData) { const { action, url, selector, value, script } = taskData; // Track active task this.activeTasks.set(taskId, { action, url, startTime: Date.now(), status: 'running' }); try { let command = `python3 -c "import sys; sys.path.append('${__dirname}'); import arm64_browser; `; switch (action) { case 'navigate': command += `print(arm64_browser.navigate('${url}'))"`; break; case 'screenshot': const filename = value || `screenshot_${taskId}.png`; command += `print(arm64_browser.screenshot('${filename}'))"`; break; case 'click': command += `print(arm64_browser.click('${selector}'))"`; break; case 'fill': command += `print(arm64_browser.fill('${selector}', '${value}'))"`; break; case 'evaluate': command += `print(arm64_browser.evaluate('${script}'))"`; break; case 'get_content': const type = value || 'text'; command += `print(arm64_browser.get_content('${type}'))"`; break; default: throw new Error(`Unknown action: ${action}`); } const result = await this.execCommand(command); // Update task status this.activeTasks.set(taskId, { ...this.activeTasks.get(taskId), status: 'completed', endTime: Date.now(), result }); // Clean up completed tasks after 5 minutes setTimeout(() => { this.activeTasks.delete(taskId); }, 5 * 60 * 1000); return result; } catch (error) { // Update task status this.activeTasks.set(taskId, { ...this.activeTasks.get(taskId), status: 'failed', endTime: Date.now(), error: error.message }); throw error; } } execCommand(command) { return new Promise((resolve, reject) => { exec(command, { timeout: 30000 }, (error, stdout, stderr) => { if (error) { reject(new Error(stderr || error.message)); } else { resolve(stdout.trim()); } }); }); } start(port = 3000) { this.app.listen(port, () => { console.log(`Agent server running on port ${port}`); console.log(`Node: ${process.env.HOSTNAME || 'pi-agent'}`); }); } } // Start server const server = new AgentServer(); server.start(process.env.PORT || 3000); module.exports = AgentServer; ``` ### 3. Web Scraping Pipeline ```python # scraping_pipeline.py import asyncio import json from dataclasses import dataclass from typing import List, Dict, Any, Optional import pandas as pd @dataclass class ScrapingTask: url: str selectors: Dict[str, str] name: str timeout: int = 30 class ScrapingPipeline: """High-performance web scraping using Pi agent farm""" def __init__(self, farm: AgentFarm): self.farm = farm self.results = [] async def scrape_sites(self, tasks: List[ScrapingTask]) -> pd.DataFrame: """Scrape multiple sites and return DataFrame""" # Convert to agent tasks agent_tasks = [] for task in tasks: agent_tasks.append({ "action": "navigate", "url": task.url }) # Navigate to all sites navigation_results = await self.farm.parallel_execute(agent_tasks) # Extract data from each site extraction_tasks = [] for i, task in enumerate(tasks): if isinstance(navigation_results[i], dict) and navigation_results[i].get('success'): for field, selector in task.selectors.items(): extraction_tasks.append({ "action": "evaluate", "script": f"document.querySelector('{selector}')?.textContent || ''", "_metadata": { "site": task.name, "field": field, "url": task.url } }) # Execute extractions extraction_results = await self.farm.parallel_execute(extraction_tasks) # Process results into DataFrame return self._process_results(extraction_results) def _process_results(self, results: List[Dict[str, Any]]) -> pd.DataFrame: """Convert raw results to structured DataFrame""" processed = [] for i, result in enumerate(results): if isinstance(result, dict) and 'result' in result: metadata = result.get('_metadata', {}) processed.append({ 'site': metadata.get('site', 'unknown'), 'field': metadata.get('field', 'unknown'), 'value': result['result'], 'url': metadata.get('url', '') }) df = pd.DataFrame(processed) # Pivot to have one row per site if not df.empty: df_pivot = df.pivot_table( index=['site', 'url'], columns='field', values='value', aggfunc='first' ).reset_index() return df_pivot return pd.DataFrame() # Usage example async def scrape_ecommerce_sites(): agents = ["pi-agent-01.local", "pi-agent-02.local"] tasks = [ ScrapingTask( name="Amazon Product", url="https://amazon.com/dp/B08N5WRWNW", selectors={ "title": "#productTitle", "price": ".a-price-whole", "rating": "[data-asin] .a-icon-alt" } ), ScrapingTask( name="eBay Product", url="https://ebay.com/itm/123456789", selectors={ "title": "h1#x-title-label-lbl", "price": ".prc .num", "condition": ".u-flL .condText" } ) ] async with AgentFarm(agents) as farm: pipeline = ScrapingPipeline(farm) results_df = await pipeline.scrape_sites(tasks) print(results_df) results_df.to_csv('scraping_results.csv', index=False) if __name__ == "__main__": asyncio.run(scrape_ecommerce_sites()) ``` ### 4. SEO Monitoring System ```python # seo_monitor.py import asyncio import sqlite3 from datetime import datetime, timedelta from typing import List, Dict, Any import schedule import time class SEOMonitor: """Automated SEO ranking monitor using Pi farm""" def __init__(self, farm: AgentFarm, db_path: str = "seo_data.db"): self.farm = farm self.db_path = db_path self.init_database() def init_database(self): """Initialize SQLite database""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS rankings ( id INTEGER PRIMARY KEY, keyword TEXT, search_engine TEXT, position INTEGER, url TEXT, timestamp DATETIME, domain TEXT ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS keywords ( id INTEGER PRIMARY KEY, keyword TEXT UNIQUE, target_domain TEXT, active BOOLEAN DEFAULT 1 ) """) async def check_keyword_rankings(self, keywords: List[str], search_engine: str = "google"): """Check rankings for multiple keywords""" tasks = [] for keyword in keywords: search_url = self._build_search_url(keyword, search_engine) tasks.append({ "action": "navigate", "url": search_url, "_keyword": keyword }) # Execute searches search_results = await self.farm.parallel_execute(tasks) # Extract rankings ranking_tasks = [] for i, result in enumerate(search_results): if isinstance(result, dict) and result.get('success'): keyword = tasks[i]["_keyword"] ranking_tasks.append({ "action": "evaluate", "script": """ Array.from(document.querySelectorAll('.g h3')).map((el, index) => ({ position: index + 1, title: el.textContent, url: el.closest('.g').querySelector('a')?.href || '' })) """, "_keyword": keyword }) ranking_results = await self.farm.parallel_execute(ranking_tasks) # Store results self._store_rankings(ranking_results, search_engine) return ranking_results def _build_search_url(self, keyword: str, search_engine: str) -> str: """Build search URL for different engines""" engines = { "google": f"https://google.com/search?q={keyword.replace(' ', '+')}&num=20", "bing": f"https://bing.com/search?q={keyword.replace(' ', '+')}&count=20", "duckduckgo": f"https://duckduckgo.com/?q={keyword.replace(' ', '+')}" } return engines.get(search_engine, engines["google"]) def _store_rankings(self, results: List[Dict], search_engine: str): """Store ranking results in database""" timestamp = datetime.now() with sqlite3.connect(self.db_path) as conn: for result in results: if isinstance(result, dict) and 'result' in result: keyword = result.get('_keyword') rankings = json.loads(result['result']) for rank_data in rankings: conn.execute(""" INSERT INTO rankings (keyword, search_engine, position, url, timestamp, domain) VALUES (?, ?, ?, ?, ?, ?) """, ( keyword, search_engine, rank_data['position'], rank_data['url'], timestamp, self._extract_domain(rank_data['url']) )) def _extract_domain(self, url: str) -> str: """Extract domain from URL""" try: from urllib.parse import urlparse return urlparse(url).netloc except: return "" def get_ranking_report(self, days: int = 7) -> pd.DataFrame: """Generate ranking report for last N days""" with sqlite3.connect(self.db_path) as conn: query = """ SELECT keyword, search_engine, position, domain, timestamp FROM rankings WHERE timestamp >= datetime('now', '-{} days') ORDER BY keyword, timestamp """.format(days) return pd.read_sql_query(query, conn) def setup_scheduler(self, keywords: List[str], interval_hours: int = 6): """Setup automated monitoring schedule""" async def run_check(): async with self.farm: await self.check_keyword_rankings(keywords) def job(): asyncio.run(run_check()) schedule.every(interval_hours).hours.do(job) print(f"SEO monitoring scheduled every {interval_hours} hours") print(f"Monitoring {len(keywords)} keywords") while True: schedule.run_pending() time.sleep(60) # Usage example async def main(): agents = ["pi-agent-01.local", "pi-agent-02.local", "pi-agent-03.local"] keywords = [ "raspberry pi automation", "browser automation arm64", "cheap web scraping", "claude code tutorial" ] async with AgentFarm(agents) as farm: monitor = SEOMonitor(farm) # One-time check results = await monitor.check_keyword_rankings(keywords) print(f"Checked {len(keywords)} keywords") # Generate report report = monitor.get_ranking_report(days=30) print(report.head()) if __name__ == "__main__": asyncio.run(main()) ``` ### 5. Load Testing Framework ```python # load_tester.py import asyncio import time from dataclasses import dataclass from typing import List, Dict, Any import statistics @dataclass class LoadTestConfig: target_url: str concurrent_users: int duration_seconds: int ramp_up_seconds: int = 0 class LoadTester: """Load testing using Pi agent farm""" def __init__(self, farm: AgentFarm): self.farm = farm self.results = [] async def run_load_test(self, config: LoadTestConfig) -> Dict[str, Any]: """Execute load test""" print(f"Starting load test: {config.concurrent_users} users for {config.duration_seconds}s") start_time = time.time() self.results = [] # Create user tasks user_tasks = [] for user_id in range(config.concurrent_users): task = self._create_user_session( user_id, config.target_url, config.duration_seconds, config.ramp_up_seconds ) user_tasks.append(task) # Run all user sessions await asyncio.gather(*user_tasks, return_exceptions=True) end_time = time.time() # Analyze results return self._analyze_results(start_time, end_time, config) async def _create_user_session(self, user_id: int, url: str, duration: int, ramp_up: int): """Simulate single user session""" # Ramp up delay if ramp_up > 0: delay = (user_id / len(self.farm.agents)) * ramp_up await asyncio.sleep(delay) session_start = time.time() requests_made = 0 while time.time() - session_start < duration: request_start = time.time() try: result = await self.farm.execute_task({ "action": "navigate", "url": url }) request_end = time.time() response_time = request_end - request_start self.results.append({ "user_id": user_id, "timestamp": request_start, "response_time": response_time, "success": True, "status_code": 200 # Assuming success }) requests_made += 1 except Exception as e: request_end = time.time() response_time = request_end - request_start self.results.append({ "user_id": user_id, "timestamp": request_start, "response_time": response_time, "success": False, "error": str(e) }) # Brief pause between requests await asyncio.sleep(0.1) def _analyze_results(self, start_time: float, end_time: float, config: LoadTestConfig) -> Dict[str, Any]: """Analyze load test results""" total_duration = end_time - start_time total_requests = len(self.results) successful_requests = sum(1 for r in self.results if r['success']) failed_requests = total_requests - successful_requests response_times = [r['response_time'] for r in self.results if r['success']] if response_times: avg_response_time = statistics.mean(response_times) median_response_time = statistics.median(response_times) p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)] min_response_time = min(response_times) max_response_time = max(response_times) else: avg_response_time = median_response_time = p95_response_time = 0 min_response_time = max_response_time = 0 throughput = successful_requests / total_duration if total_duration > 0 else 0 error_rate = failed_requests / total_requests if total_requests > 0 else 0 return { "config": config, "summary": { "total_duration": total_duration, "total_requests": total_requests, "successful_requests": successful_requests, "failed_requests": failed_requests, "throughput_rps": throughput, "error_rate": error_rate }, "response_times": { "avg": avg_response_time, "median": median_response_time, "p95": p95_response_time, "min": min_response_time, "max": max_response_time }, "raw_results": self.results } # Usage example async def main(): agents = ["pi-agent-01.local", "pi-agent-02.local", "pi-agent-03.local"] config = LoadTestConfig( target_url="https://example.com", concurrent_users=10, duration_seconds=60, ramp_up_seconds=10 ) async with AgentFarm(agents) as farm: tester = LoadTester(farm) results = await tester.run_load_test(config) print(f"Load Test Results:") print(f"Total Requests: {results['summary']['total_requests']}") print(f"Success Rate: {(1 - results['summary']['error_rate']) * 100:.1f}%") print(f"Throughput: {results['summary']['throughput_rps']:.2f} RPS") print(f"Avg Response Time: {results['response_times']['avg']:.3f}s") print(f"95th Percentile: {results['response_times']['p95']:.3f}s") if __name__ == "__main__": asyncio.run(main()) ``` ### 6. Docker Deployment ```dockerfile # Dockerfile FROM balenalib/raspberry-pi-node:18-bullseye # Install system dependencies RUN apt-get update && apt-get install -y \ chromium-browser \ python3 \ python3-pip \ && rm -rf /var/lib/apt/lists/* # Create app directory WORKDIR /app # Copy package files COPY package*.json ./ RUN npm install # Copy application code COPY . . # Install Python dependencies RUN pip3 install aiohttp pandas # Create non-root user RUN useradd -m -u 1001 agent RUN chown -R agent:agent /app USER agent # Expose port EXPOSE 3000 # Health check HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:3000/health || exit 1 # Start command CMD ["node", "agent-server.js"] ``` ```yaml # docker-compose.yml version: '3.8' services: pi-agent-01: build: . container_name: pi-agent-01 hostname: pi-agent-01 ports: - "3001:3000" environment: - NODE_ENV=production - HOSTNAME=pi-agent-01 restart: unless-stopped pi-agent-02: build: . container_name: pi-agent-02 hostname: pi-agent-02 ports: - "3002:3000" environment: - NODE_ENV=production - HOSTNAME=pi-agent-02 restart: unless-stopped pi-agent-03: build: . container_name: pi-agent-03 hostname: pi-agent-03 ports: - "3003:3000" environment: - NODE_ENV=production - HOSTNAME=pi-agent-03 restart: unless-stopped load-balancer: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf depends_on: - pi-agent-01 - pi-agent-02 - pi-agent-03 restart: unless-stopped ``` ## 🚀 Quick Start Templates ### Minimal Farm (3 nodes) ```bash #!/bin/bash # setup-minimal-farm.sh # Clone and setup on each Pi for pi in pi-01 pi-02 pi-03; do ssh pi@$pi " git clone https://github.com/nfodor/claude-arm64-browser cd claude-arm64-browser npm install python3 -c 'import sys; sys.path.append(\".\"); import arm64_browser; print(\"✅ Ready!\")' " done echo "Minimal farm setup complete!" ``` ### Production Farm (10+ nodes) ```bash #!/bin/bash # setup-production-farm.sh # Use Ansible for production deployment ansible-playbook -i production-inventory.yml deploy-farm.yml # Setup monitoring docker-compose -f monitoring/docker-compose.yml up -d # Setup load balancer ansible-playbook -i production-inventory.yml setup-lb.yml echo "Production farm deployed!" ``` --- **Previous**: [[Architecture]] - Technical design **Related**: [[Implementation-Guide]] | [[Use-Cases]]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nfodor/mcp-chromium-arm64'

If you have feedback or need assistance with the MCP directory API, please join our Discord server