Skip to main content
Glama
by elad12390
extractor.py7.37 kB
"""Structured data extraction from web pages.""" from __future__ import annotations import json import re from dataclasses import dataclass from typing import Any from bs4 import BeautifulSoup def _sanitize_text(text: str) -> str: """Remove control characters and clean up text for JSON serialization. Args: text: Raw text that may contain control characters Returns: Cleaned text safe for JSON encoding """ if not text: return text # Remove control characters (except newline, carriage return, tab) text = re.sub(r"[\x00-\x08\x0b-\x0c\x0e-\x1f\x7f]", "", text) # Normalize whitespace text = " ".join(text.split()) return text @dataclass class TableData: """Extracted table data.""" caption: str | None headers: list[str] rows: list[dict[str, str]] @dataclass class ListData: """Extracted list data.""" title: str | None items: list[str] nested: bool class DataExtractor: """Extract structured data from HTML.""" def extract_tables(self, html: str, max_tables: int = 5) -> list[TableData]: """Extract HTML tables. Args: html: Raw HTML content max_tables: Maximum number of tables to extract Returns: List of TableData objects with caption, headers, and rows """ soup = BeautifulSoup(html, "html.parser") tables = [] for table in soup.find_all("table")[:max_tables]: # Extract caption caption_elem = table.find("caption") caption = _sanitize_text(caption_elem.get_text(strip=True)) if caption_elem else None # Extract headers headers = [] header_row = table.find("thead") if header_row: headers = [ _sanitize_text(th.get_text(strip=True)) for th in header_row.find_all("th") ] else: # Try first row first_row = table.find("tr") if first_row: headers = [ _sanitize_text(th.get_text(strip=True)) for th in first_row.find_all("th") ] # If no headers found, use generic column names if not headers: first_row = table.find("tr") if first_row: num_cols = len(first_row.find_all(["td", "th"])) headers = [f"Column {i + 1}" for i in range(num_cols)] if not headers: continue # Extract rows rows = [] tbody = table.find("tbody") row_elements = ( tbody.find_all("tr") if tbody else table.find_all("tr")[1:] ) # Skip header row if no tbody for tr in row_elements: cells = tr.find_all(["td", "th"]) if cells and len(cells) == len(headers): row_dict = {} for i, cell in enumerate(cells): row_dict[headers[i]] = _sanitize_text(cell.get_text(strip=True)) rows.append(row_dict) if rows: tables.append(TableData(caption=caption, headers=headers, rows=rows)) return tables def extract_lists(self, html: str, max_lists: int = 5) -> list[ListData]: """Extract HTML lists (ul, ol, dl). Args: html: Raw HTML content max_lists: Maximum number of lists to extract Returns: List of ListData objects with title and items """ soup = BeautifulSoup(html, "html.parser") lists = [] for list_elem in soup.find_all(["ul", "ol", "dl"])[:max_lists]: # Try to find a title (preceding heading) title = None prev = list_elem.find_previous(["h1", "h2", "h3", "h4", "h5", "h6"]) if prev: title = _sanitize_text(prev.get_text(strip=True)) # Extract items items = [] if list_elem.name in ["ul", "ol"]: for li in list_elem.find_all("li", recursive=False): items.append(_sanitize_text(li.get_text(strip=True))) else: # dl for dt in list_elem.find_all("dt"): dd = dt.find_next_sibling("dd") if dd: items.append( f"{_sanitize_text(dt.get_text(strip=True))}: {_sanitize_text(dd.get_text(strip=True))}" ) if items: lists.append( ListData( title=title, items=items, nested=False, # TODO: detect nested lists ) ) return lists def extract_fields(self, html: str, selectors: dict[str, str]) -> dict[str, str | list[str]]: """Extract specific fields using CSS selectors. Args: html: Raw HTML content selectors: Dict mapping field names to CSS selectors Returns: Dict with extracted field values (single string or list of strings) """ soup = BeautifulSoup(html, "html.parser") data: dict[str, str | list[str]] = {} for field_name, selector in selectors.items(): elements = soup.select(selector) if elements: if len(elements) == 1: data[field_name] = _sanitize_text(elements[0].get_text(strip=True)) else: data[field_name] = [_sanitize_text(el.get_text(strip=True)) for el in elements] return data def extract_json_ld(self, html: str) -> list[dict[str, Any]]: """Extract JSON-LD structured data. Args: html: Raw HTML content Returns: List of JSON-LD objects found in the page """ soup = BeautifulSoup(html, "html.parser") json_ld_scripts = soup.find_all("script", type="application/ld+json") data = [] for script in json_ld_scripts: try: if script.string: parsed = json.loads(script.string) data.append(parsed) except json.JSONDecodeError: pass return data def auto_extract(self, html: str) -> dict[str, Any]: """Automatically detect and extract structured content. Args: html: Raw HTML content Returns: Dict containing all detected structured data (tables, lists, json_ld) """ results: dict[str, Any] = {"tables": [], "lists": [], "json_ld": []} # Try JSON-LD first (highest quality) json_ld = self.extract_json_ld(html) if json_ld: results["json_ld"] = json_ld # Extract tables tables = self.extract_tables(html, max_tables=3) if tables: results["tables"] = [ {"caption": t.caption, "headers": t.headers, "rows": t.rows} for t in tables ] # Extract lists lists = self.extract_lists(html, max_lists=3) if lists: results["lists"] = [{"title": li.title, "items": li.items} for li in lists] return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/elad12390/web-research-assistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server