Skip to main content
Glama
unstructured.py11.1 kB
"""Document processor using Unstructured.io API.""" import io import logging import time from collections.abc import Awaitable, Callable from typing import Any, Optional import anyio import httpx from .base import DocumentProcessor, ProcessingResult, ProcessorError logger = logging.getLogger(__name__) class UnstructuredProcessor(DocumentProcessor): """Document processor using Unstructured.io API. The Unstructured API provides document parsing capabilities for various formats including PDF, DOCX, images with OCR, and more. API Documentation: https://docs.unstructured.io/api-reference/api-services/api-parameters """ # Supported MIME types for Unstructured SUPPORTED_TYPES = { "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.ms-powerpoint", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel", "application/rtf", "text/rtf", "application/vnd.oasis.opendocument.text", "application/epub+zip", "message/rfc822", "application/vnd.ms-outlook", "image/jpeg", "image/png", "image/tiff", "image/bmp", } def __init__( self, api_url: str, timeout: int = 120, default_strategy: str = "auto", default_languages: Optional[list[str]] = None, progress_interval: int = 10, ): """Initialize Unstructured processor. Args: api_url: Unstructured API endpoint timeout: Request timeout in seconds (default: 120) default_strategy: Default parsing strategy - "auto", "fast", or "hi_res" default_languages: Default OCR language codes (e.g., ["eng", "deu"]) progress_interval: Seconds between progress updates (default: 10) """ self.api_url = api_url self.timeout = timeout self.default_strategy = default_strategy self.default_languages = default_languages or ["eng"] self.progress_interval = progress_interval logger.info( f"Initialized UnstructuredProcessor: {api_url}, " f"strategy={default_strategy}, languages={self.default_languages}, " f"progress_interval={progress_interval}s" ) @property def name(self) -> str: return "unstructured" @property def supported_mime_types(self) -> set[str]: return self.SUPPORTED_TYPES async def _run_progress_poller( self, stop_event: anyio.Event, progress_callback: Callable[ [float, Optional[float], Optional[str]], Awaitable[None] ], start_time: float, ): """Run progress poller that reports status every N seconds. Args: stop_event: Event to signal when processing is complete progress_callback: Async callback to report progress start_time: Time when processing started (from time.time()) """ logger.debug("Starting progress poller") while not stop_event.is_set(): try: # Wait for the event to be set, with a timeout equal to progress_interval with anyio.fail_after(self.progress_interval): await stop_event.wait() # If wait() finished, the event was set (processing complete) break except TimeoutError: # Timeout occurred - time to send a progress update if not stop_event.is_set(): # Double-check in case of race condition elapsed = int(time.time() - start_time) message = ( f"Processing document with unstructured... ({elapsed}s elapsed)" ) try: await progress_callback( # type: ignore progress=float(elapsed), # type: ignore total=None, # Unknown total duration # type: ignore message=message, # type: ignore ) logger.debug(f"Progress update sent: {elapsed}s elapsed") except Exception as e: logger.warning(f"Failed to send progress update: {e}") logger.debug("Progress poller stopped") async def _make_api_request( self, content: bytes, content_type: str, filename: Optional[str], strategy: str, languages: list[str], extract_image_block_types: Optional[list[str]], ) -> ProcessingResult: """Make the actual API request to Unstructured. Args: content: Document bytes content_type: MIME type filename: Optional filename strategy: Processing strategy languages: OCR languages extract_image_block_types: Image element types to extract Returns: ProcessingResult with extracted text and metadata Raises: ProcessorError: If processing fails """ # Prepare multipart request files = { "files": ( filename or "document", io.BytesIO(content), content_type or "application/octet-stream", ) } data = { "strategy": strategy, "languages": ",".join(languages), } if extract_image_block_types: data["extract_image_block_types"] = ",".join(extract_image_block_types) logger.debug( f"Processing with Unstructured API: strategy={strategy}, languages={languages}" ) try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.api_url}/general/v0/general", files=files, data=data, ) response.raise_for_status() # Parse response elements = response.json() # Extract text and metadata texts = [] element_types: dict[str, int] = {} for element in elements: if "text" in element and element["text"]: texts.append(element["text"]) el_type = element.get("type", "unknown") element_types[el_type] = element_types.get(el_type, 0) + 1 parsed_text = "\n\n".join(texts) metadata = { "element_count": len(elements), "text_length": len(parsed_text), "element_types": element_types, "strategy": strategy, "languages": languages, } logger.debug( f"Successfully processed: {len(elements)} elements, " f"{len(parsed_text)} characters" ) return ProcessingResult( text=parsed_text, metadata=metadata, processor=self.name, success=True, ) except httpx.HTTPError as e: logger.error(f"Unstructured API HTTP error: {e}") raise ProcessorError(f"HTTP error: {str(e)}") from e except Exception as e: logger.error(f"Unstructured API processing failed: {e}") raise ProcessorError(f"Processing failed: {str(e)}") from e async def process( self, content: bytes, content_type: str, filename: Optional[str] = None, options: Optional[dict[str, Any]] = None, progress_callback: Optional[ Callable[[float, Optional[float], Optional[str]], Awaitable[None]] ] = None, ) -> ProcessingResult: """Process document via Unstructured API. Args: content: Document bytes content_type: MIME type filename: Optional filename for format detection options: Processing options: - strategy: "auto", "fast", or "hi_res" (default: from init) - languages: List of language codes (default: from init) - extract_image_block_types: Types of image elements to extract progress_callback: Optional async callback for progress updates Returns: ProcessingResult with extracted text and metadata Raises: ProcessorError: If processing fails """ options = options or {} # Extract options with defaults strategy = options.get("strategy", self.default_strategy) languages = options.get("languages", self.default_languages) extract_image_block_types = options.get("extract_image_block_types") # If no progress callback, just make the request directly if progress_callback is None: return await self._make_api_request( content=content, content_type=content_type, filename=filename, strategy=strategy, languages=languages, extract_image_block_types=extract_image_block_types, ) # With progress callback: run API request + progress poller concurrently stop_event = anyio.Event() start_time = time.time() result = None async def capture_result(): nonlocal result try: result = await self._make_api_request( content=content, content_type=content_type, filename=filename, strategy=strategy, languages=languages, extract_image_block_types=extract_image_block_types, ) finally: # Signal poller to stop after API request completes stop_event.set() # Run both tasks concurrently using anyio task groups async with anyio.create_task_group() as tg: tg.start_soon(capture_result) tg.start_soon( self._run_progress_poller, stop_event, progress_callback, start_time ) return result # type: ignore async def health_check(self) -> bool: """Check if Unstructured API is available. Returns: True if API is healthy, False otherwise """ try: async with httpx.AsyncClient(timeout=5) as client: response = await client.get(f"{self.api_url}/healthcheck") return response.status_code == 200 except Exception as e: logger.warning(f"Unstructured health check failed: {e}") return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/No-Smoke/nextcloud-mcp-comprehensive'

If you have feedback or need assistance with the MCP directory API, please join our Discord server