Skip to main content
Glama

AnyDB MCP Server

by iamayuppie
webscrapertool.py14.6 kB
""" Web scraper tool module for AnyDB MCP Server. Contains functionality for: - Web page scraping and content extraction - URL validation and processing - Integration with vector database for storage and retrieval """ import asyncio import logging import re import urllib.parse from typing import Any, Dict, List, Optional from datetime import datetime import requests from bs4 import BeautifulSoup import html2text from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from filetool import VectorDatabaseManager # Get logger logger = logging.getLogger('mcp_server') class WebScraperManager: """Manages web scraping operations with robust error handling.""" def __init__(self, timeout: int = 30, max_retries: int = 3): self.timeout = timeout self.max_retries = max_retries self.session = self._create_session() # Configure html2text converter self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False self.html_converter.ignore_images = True self.html_converter.ignore_emphasis = False self.html_converter.body_width = 0 # Don't wrap lines def _create_session(self) -> requests.Session: """Create a requests session with retry strategy and proper headers.""" session = requests.Session() # Set up retry strategy retry_strategy = Retry( total=self.max_retries, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "OPTIONS"], backoff_factor=1 ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) # Set common headers to avoid being blocked session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) return session def _validate_url(self, url: str) -> str: """Validate and normalize URL.""" if not url.strip(): raise ValueError("URL cannot be empty") # Add protocol if missing if not url.startswith(('http://', 'https://')): url = 'https://' + url # Validate URL format parsed = urllib.parse.urlparse(url) if not parsed.netloc: raise ValueError(f"Invalid URL format: {url}") # Re-parse to get the final URL after normalization parsed = urllib.parse.urlparse(url) # Security check - block certain schemes and local addresses blocked_schemes = ['file', 'ftp', 'javascript'] if parsed.scheme.lower() in blocked_schemes: raise ValueError(f"URL scheme '{parsed.scheme}' is not allowed") # Block local/private IP addresses for security blocked_patterns = [ r'^localhost$', r'^127\.', r'^192\.168\.', r'^10\.', r'^172\.(1[6-9]|2[0-9]|3[0-1])\.', r'^\[::1\]$', r'^\[::' ] for pattern in blocked_patterns: if re.match(pattern, parsed.netloc.lower()): raise ValueError(f"Local/private URLs are not allowed: {url}") return url def _extract_content(self, html: str, url: str) -> Dict[str, str]: """Extract and clean content from HTML.""" soup = BeautifulSoup(html, 'lxml') # Remove script, style, and other non-content elements for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'advertisement']): element.decompose() # Extract title title_element = soup.find('title') title = title_element.get_text().strip() if title_element else '' if not title: title = urllib.parse.urlparse(url).netloc # Extract meta description meta_desc = '' meta_element = soup.find('meta', attrs={'name': 'description'}) or soup.find('meta', attrs={'property': 'og:description'}) if meta_element: meta_desc = meta_element.get('content', '').strip() # Convert HTML to clean text text_content = self.html_converter.handle(str(soup)) # Clean up the text lines = text_content.split('\n') cleaned_lines = [] for line in lines: line = line.strip() if line and len(line) > 5: # Filter out very short lines cleaned_lines.append(line) clean_text = '\n'.join(cleaned_lines) # Truncate if too long (vector database limits) max_length = 50000 # Reasonable limit for vector storage if len(clean_text) > max_length: clean_text = clean_text[:max_length] + "\n\n[Content truncated due to length...]" return { 'title': title, 'description': meta_desc, 'content': clean_text, 'url': url, 'scraped_at': datetime.now().isoformat() } async def scrape_url(self, url: str) -> Dict[str, str]: """Scrape content from a URL asynchronously.""" logger.info(f"Starting to scrape URL: {url}") try: # Validate URL validated_url = self._validate_url(url) logger.debug(f"Validated URL: {validated_url}") # Perform the web request in a thread to avoid blocking loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self.session.get(validated_url, timeout=self.timeout) ) # Check response status response.raise_for_status() # Check content type content_type = response.headers.get('content-type', '').lower() if 'text/html' not in content_type: raise ValueError(f"URL does not return HTML content. Content-Type: {content_type}") # Extract content extracted = self._extract_content(response.text, validated_url) logger.info(f"Successfully scraped {len(extracted['content'])} characters from {validated_url}") return extracted except requests.RequestException as e: error_msg = f"Failed to fetch URL {url}: {str(e)}" logger.error(error_msg) raise Exception(error_msg) except Exception as e: error_msg = f"Error scraping URL {url}: {str(e)}" logger.error(error_msg) raise Exception(error_msg) class WebScraperTools: """High-level web scraper tool operations.""" def __init__(self, vector_db_manager: VectorDatabaseManager): self.scraper = WebScraperManager() self.vector_db = vector_db_manager async def scrape_and_store(self, url: str, custom_filename: Optional[str] = None) -> Dict[str, Any]: """Scrape a URL and store the content in the vector database.""" logger.info(f"Scrape and Store - URL: {url}") try: # Scrape the URL scraped_data = await self.scraper.scrape_url(url) # Generate filename if not provided if not custom_filename: parsed_url = urllib.parse.urlparse(scraped_data['url']) domain = parsed_url.netloc.replace('www.', '') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') custom_filename = f"webpage_{domain}_{timestamp}.txt" # Create document content with metadata document_content = f"""Title: {scraped_data['title']} URL: {scraped_data['url']} Scraped: {scraped_data['scraped_at']} Description: {scraped_data['description']} {scraped_data['content']}""" # Prepare metadata for vector storage metadata = { 'source': 'web_scraper', 'url': scraped_data['url'], 'title': scraped_data['title'], 'description': scraped_data['description'], 'scraped_at': scraped_data['scraped_at'], 'content_length': len(scraped_data['content']) } # Store in vector database chunks_added = await self.vector_db.add_file( filename=custom_filename, content=document_content, metadata=metadata ) response_data = { 'url': scraped_data['url'], 'title': scraped_data['title'], 'filename': custom_filename, 'content_length': len(scraped_data['content']), 'chunks_added': chunks_added, 'message': f"Successfully scraped and stored webpage as '{custom_filename}'" } logger.info(f"Successfully stored scraped content with {chunks_added} chunks") return response_data except Exception as e: error_msg = f"Failed to scrape and store URL {url}: {str(e)}" logger.error(error_msg) return { 'url': url, 'error': error_msg, 'message': 'Failed to scrape webpage' } async def query_scraped_content(self, query: str, max_results: int = 5) -> Dict[str, Any]: """Query the vector database for scraped web content.""" logger.info(f"Querying scraped content: {query}") try: # Search the vector database search_results = await self.vector_db.search_files(query, max_results) # Filter results to only include web scraped content web_results = [] for result in search_results: metadata = result.get('metadata', {}) if metadata.get('source') == 'web_scraper': web_results.append({ 'filename': result['filename'], 'content': result['content'], 'similarity_score': result['similarity_score'], 'url': metadata.get('url', ''), 'title': metadata.get('title', ''), 'scraped_at': metadata.get('scraped_at', '') }) response_data = { 'query': query, 'results_found': len(web_results), 'results': web_results, 'message': f"Found {len(web_results)} relevant web pages" } logger.info(f"Query returned {len(web_results)} web scraping results") return response_data except Exception as e: error_msg = f"Failed to query scraped content: {str(e)}" logger.error(error_msg) return { 'query': query, 'error': error_msg, 'message': 'Failed to search scraped content' } async def list_scraped_pages(self) -> Dict[str, Any]: """List all scraped web pages stored in the vector database.""" logger.info("Listing all scraped web pages") try: # Get all files from vector database all_files = await self.vector_db.list_files() # Filter for web scraped content web_pages = [] for file_info in all_files: metadata = file_info.get('metadata', {}) if metadata.get('source') == 'web_scraper': web_pages.append({ 'filename': file_info['filename'], 'url': metadata.get('url', ''), 'title': metadata.get('title', 'No title'), 'description': metadata.get('description', ''), 'scraped_at': metadata.get('scraped_at', ''), 'content_length': metadata.get('content_length', 0), 'chunk_count': file_info.get('chunk_count', 0) }) response_data = { 'total_pages': len(web_pages), 'pages': web_pages, 'message': f"Found {len(web_pages)} scraped web pages" } logger.info(f"Listed {len(web_pages)} scraped web pages") return response_data except Exception as e: error_msg = f"Failed to list scraped pages: {str(e)}" logger.error(error_msg) return { 'error': error_msg, 'message': 'Failed to list scraped web pages' } async def remove_scraped_page(self, filename: str) -> Dict[str, Any]: """Remove a scraped web page from the vector database.""" logger.info(f"Removing scraped page: {filename}") try: # Remove from vector database success = await self.vector_db.remove_file(filename) if success: response_data = { 'filename': filename, 'message': f"Successfully removed scraped page '{filename}'" } logger.info(f"Successfully removed scraped page: {filename}") else: response_data = { 'filename': filename, 'message': f"Scraped page '{filename}' not found or already removed" } logger.warning(f"Scraped page not found: {filename}") return response_data except Exception as e: error_msg = f"Failed to remove scraped page {filename}: {str(e)}" logger.error(error_msg) return { 'filename': filename, 'error': error_msg, 'message': 'Failed to remove scraped page' }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/iamayuppie/AnyDbApp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server