Skip to main content
Glama
joplin_mcp.py36.9 kB
#!/usr/bin/env python3 """ MCP Server for Joplin Notes. This server provides tools to interact with the Joplin Data API, including note management, notebooks, tags, and search. Requirements: - Joplin desktop or CLI must be running with the Web Clipper service enabled - API runs on localhost:41184 by default Environment variables: JOPLIN_TOKEN: (Required) API token from Joplin's Web Clipper settings JOPLIN_PORT: (Optional) API port, defaults to 41184 JOPLIN_AUTO_LAUNCH: (Optional) Set to 'true' (default) to auto-launch Joplin desktop if not running. Set to 'false' to disable. On connection failure, will attempt to launch Joplin and retry once after a 2 second wait. """ import asyncio import json import os import shutil import subprocess import time from datetime import datetime from enum import Enum from typing import Any, Optional from dotenv import load_dotenv load_dotenv() # Load .env file if present import httpx from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, ConfigDict, Field, field_validator # Initialize the MCP server mcp = FastMCP("joplin_mcp") # Constants DEFAULT_PORT = 41184 CHARACTER_LIMIT = 25000 AUTO_LAUNCH_ENABLED = os.environ.get("JOPLIN_AUTO_LAUNCH", "true").lower() == "true" LAUNCH_WAIT_SECONDS = 2.0 MAX_LAUNCH_RETRIES = 1 # Only retry once to avoid masking other issues ENSURE_RUNNING_TIMEOUT = 25.0 # Max seconds to wait for Joplin to become ready (AppImage can be slow) ENSURE_RUNNING_POLL_INTERVAL = 1.0 # Seconds between API readiness checks # ============================================================================= # Auto-Launch Utilities # ============================================================================= def _is_joplin_running() -> bool: """Check if Joplin desktop is running.""" try: result = subprocess.run( ["pgrep", "-f", "joplin"], capture_output=True, timeout=5, ) return result.returncode == 0 except Exception: return False def _launch_joplin() -> bool: """ Attempt to launch Joplin desktop application. Returns True if launch command was issued successfully. """ # Try common Joplin executable locations home = os.path.expanduser("~") joplin_commands = [ f"{home}/.joplin/Joplin.AppImage", # Default AppImage location "joplin-desktop", # Standard Linux package "joplin", # Alternative name "/usr/bin/joplin-desktop", "/usr/bin/joplin", "/snap/bin/joplin-desktop", # Snap package "/opt/Joplin/joplin", # Manual AppImage install ] # Also check for flatpak flatpak_cmd = ["flatpak", "run", "net.cozic.joplin_desktop"] # Build environment with DISPLAY for GUI apps on Linux env = os.environ.copy() if "DISPLAY" not in env: env["DISPLAY"] = ":0" # Default X11 display # Also set WAYLAND_DISPLAY if available for Wayland systems if "WAYLAND_DISPLAY" not in env and os.path.exists("/run/user/1000/wayland-0"): env["WAYLAND_DISPLAY"] = "wayland-0" for cmd in joplin_commands: if shutil.which(cmd) or os.path.isfile(cmd): try: subprocess.Popen( [cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, env=env, ) return True except Exception: continue # Try flatpak as fallback if shutil.which("flatpak"): try: subprocess.Popen( flatpak_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, env=env, ) return True except Exception: pass return False async def _wait_for_joplin_api_ready(timeout: float = ENSURE_RUNNING_TIMEOUT) -> bool: """ Poll until Joplin API is responsive. Args: timeout: Maximum seconds to wait for API readiness. Returns: True if API became ready, False if timeout exceeded. """ port = os.environ.get("JOPLIN_PORT", DEFAULT_PORT) token = os.environ.get("JOPLIN_TOKEN", "") base_url = f"http://localhost:{port}" start = time.time() while time.time() - start < timeout: try: async with httpx.AsyncClient() as client: # Use /ping endpoint to check API readiness resp = await client.get( f"{base_url}/ping", params={"token": token}, timeout=2.0, ) if resp.status_code == 200: return True except (httpx.ConnectError, httpx.ConnectTimeout): pass await asyncio.sleep(ENSURE_RUNNING_POLL_INTERVAL) return False # ============================================================================= # Enums # ============================================================================= class ResponseFormat(str, Enum): """Output format for tool responses.""" MARKDOWN = "markdown" JSON = "json" class NotesSortField(str, Enum): """Fields to sort notes by.""" UPDATED_TIME = "updated_time" CREATED_TIME = "created_time" TITLE = "title" ORDER = "order" # ============================================================================= # Pydantic Input Models # ============================================================================= class ListNotebooksInput(BaseModel): """Input model for listing notebooks.""" model_config = ConfigDict(str_strip_whitespace=True) response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' or 'json'", ) class ListNotesInput(BaseModel): """Input model for listing notes.""" model_config = ConfigDict(str_strip_whitespace=True) notebook_id: Optional[str] = Field( default=None, description="Filter by notebook ID. If not set, lists all notes.", ) limit: int = Field( default=50, description="Maximum notes to return", ge=1, le=100 ) order_by: NotesSortField = Field( default=NotesSortField.UPDATED_TIME, description="Field to sort by", ) order_desc: bool = Field( default=True, description="Sort descending (newest first)" ) response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' or 'json'", ) class GetNoteInput(BaseModel): """Input model for getting a single note.""" model_config = ConfigDict(str_strip_whitespace=True) note_id: str = Field( ..., description="The note ID", min_length=1 ) include_body: bool = Field( default=True, description="Include the full note body/content" ) response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' or 'json'", ) class CreateNoteInput(BaseModel): """Input model for creating a note.""" model_config = ConfigDict(str_strip_whitespace=True) title: str = Field( ..., description="Note title", min_length=1, max_length=500, ) body: str = Field( default="", description="Note content in Markdown format", max_length=100000, ) notebook_id: Optional[str] = Field( default=None, description="Notebook ID to create note in. Uses default notebook if not specified.", ) tags: Optional[list[str]] = Field( default=None, description="List of tag names to apply (will be created if they don't exist)", ) is_todo: bool = Field( default=False, description="Create as a to-do item instead of a regular note", ) class UpdateNoteInput(BaseModel): """Input model for updating a note.""" model_config = ConfigDict(str_strip_whitespace=True) note_id: str = Field(..., description="The note ID to update", min_length=1) title: Optional[str] = Field( default=None, description="New note title", max_length=500, ) body: Optional[str] = Field( default=None, description="New note content in Markdown", max_length=100000, ) notebook_id: Optional[str] = Field( default=None, description="Move note to different notebook", ) is_todo: Optional[bool] = Field( default=None, description="Convert to/from to-do item", ) todo_completed: Optional[bool] = Field( default=None, description="Mark to-do as completed/incomplete", ) class DeleteNoteInput(BaseModel): """Input model for deleting a note.""" model_config = ConfigDict(str_strip_whitespace=True) note_id: str = Field(..., description="The note ID to delete", min_length=1) class SearchNotesInput(BaseModel): """Input model for searching notes.""" model_config = ConfigDict(str_strip_whitespace=True) query: str = Field( ..., description="Search query. Supports: title:, body:, tag:, notebook:, created:, updated:, type: prefixes", min_length=1, max_length=500, ) limit: int = Field( default=20, description="Maximum results to return", ge=1, le=100 ) response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' or 'json'", ) class ListTagsInput(BaseModel): """Input model for listing tags.""" model_config = ConfigDict(str_strip_whitespace=True) response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' or 'json'", ) class CreateNotebookInput(BaseModel): """Input model for creating a notebook.""" model_config = ConfigDict(str_strip_whitespace=True) title: str = Field( ..., description="Notebook title", min_length=1, max_length=200, ) parent_id: Optional[str] = Field( default=None, description="Parent notebook ID for creating a sub-notebook", ) class TagNoteInput(BaseModel): """Input model for adding a tag to a note.""" model_config = ConfigDict(str_strip_whitespace=True) note_id: str = Field(..., description="The note ID to tag", min_length=1) tag: str = Field( ..., description="Tag name to add (will be created if it doesn't exist)", min_length=1, max_length=100, ) # ============================================================================= # Shared Utilities # ============================================================================= def _get_api_config() -> tuple[str, str]: """Get API base URL and token.""" token = os.environ.get("JOPLIN_TOKEN") if not token: raise ValueError( "JOPLIN_TOKEN environment variable not set. " "Get your token from: Joplin → Tools → Options → Web Clipper" ) port = os.environ.get("JOPLIN_PORT", DEFAULT_PORT) base_url = f"http://localhost:{port}" return base_url, token async def _make_api_request( endpoint: str, method: str = "GET", json_data: Optional[dict] = None, params: Optional[dict] = None, _retry_count: int = 0, ) -> dict | list | None: """Make request to Joplin API with auto-launch retry.""" base_url, token = _get_api_config() # Add token to params if params is None: params = {} params["token"] = token try: async with httpx.AsyncClient() as client: response = await client.request( method, f"{base_url}/{endpoint}", json=json_data, params=params, timeout=30.0, ) response.raise_for_status() if response.status_code == 204 or not response.content: return None return response.json() except (httpx.ConnectError, httpx.ConnectTimeout) as e: # Auto-launch logic: only retry once if AUTO_LAUNCH_ENABLED and _retry_count < MAX_LAUNCH_RETRIES: if not _is_joplin_running(): launched = _launch_joplin() if launched: # Wait for Joplin to start and enable Web Clipper await asyncio.sleep(LAUNCH_WAIT_SECONDS) # Retry the request once return await _make_api_request( endpoint, method, json_data, params, _retry_count=_retry_count + 1, ) # Re-raise if auto-launch disabled, already retried, or launch failed raise async def _get_all_paginated( endpoint: str, params: Optional[dict] = None, limit: int = 100, ) -> list: """Fetch all items with pagination.""" if params is None: params = {} items = [] page = 1 while True: params["page"] = page params["limit"] = min(limit, 100) result = await _make_api_request(endpoint, params=params) if isinstance(result, dict) and "items" in result: items.extend(result["items"]) if not result.get("has_more", False): break elif isinstance(result, list): items.extend(result) if len(result) < params["limit"]: break else: break page += 1 # Safety limit if page > 50: break return items[:limit] if limit else items def _handle_error(e: Exception) -> str: """Format errors with actionable messages.""" error_str = str(e).lower() if "connection refused" in error_str or "connect" in error_str: auto_launch_note = "" if AUTO_LAUNCH_ENABLED: auto_launch_note = "\n\nNote: Auto-launch was attempted but Joplin may not have started in time." else: auto_launch_note = "\n\nTip: Set JOPLIN_AUTO_LAUNCH=true to auto-start Joplin." return ( "Error: Cannot connect to Joplin. Make sure:\n" "1. Joplin desktop is running\n" "2. Web Clipper service is enabled (Tools → Options → Web Clipper)\n" f"3. The API port matches JOPLIN_PORT (default: 41184){auto_launch_note}" ) elif "401" in error_str or "unauthorized" in error_str or "forbidden" in error_str: return "Error: Invalid API token. Check JOPLIN_TOKEN is correct." elif "404" in error_str: return "Error: Resource not found. Check the ID is correct." elif "timeout" in error_str: return "Error: Request timed out. Joplin may be busy or unresponsive." return f"Error: {type(e).__name__}: {str(e)}" def _format_timestamp(ts: Optional[int]) -> str: """Format Unix timestamp (ms) to readable string.""" if not ts: return "Unknown" try: dt = datetime.fromtimestamp(ts / 1000) return dt.strftime("%Y-%m-%d %H:%M") except Exception: return str(ts) def _truncate_response(result: str, item_count: int) -> str: """Truncate response if it exceeds character limit.""" if len(result) > CHARACTER_LIMIT: truncated = result[: CHARACTER_LIMIT - 200] truncated += f"\n\n---\n**Response truncated** ({item_count} items). Use filters to narrow results." return truncated return result # ============================================================================= # System Tools # ============================================================================= @mcp.tool( name="joplin_ensure_running", annotations={ "title": "Ensure Joplin is Running", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_ensure_running() -> str: """ Ensure API ready. Launches Joplin if needed, waits for connection. Use proactively before batch operations to avoid cold-start delays. Returns immediately if already running. Useful for session pre-warming. Returns: Status message: 'already_running', 'launched', or error details. """ # Check if already running and API responsive if _is_joplin_running(): # Verify API is actually ready (Web Clipper enabled) if await _wait_for_joplin_api_ready(timeout=2.0): return "✅ Joplin is already running and API is ready." # Not running or API not ready - attempt launch if not AUTO_LAUNCH_ENABLED: return ( "❌ Joplin is not running and auto-launch is disabled. " "Please start Joplin manually and enable Web Clipper." ) launched = _launch_joplin() if not launched: return ( "❌ Failed to launch Joplin. Could not find Joplin executable. " "Please start Joplin manually." ) # Wait for API to become ready if await _wait_for_joplin_api_ready(): return "✅ Joplin launched successfully and API is ready." return ( "⚠️ Joplin was launched but API did not become ready within " f"{ENSURE_RUNNING_TIMEOUT} seconds. Please check that Web Clipper " "is enabled in Joplin (Tools → Options → Web Clipper)." ) # ============================================================================= # Notebook Tools # ============================================================================= @mcp.tool( name="joplin_list_notebooks", annotations={ "title": "List Joplin Notebooks", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_list_notebooks(params: ListNotebooksInput) -> str: """ List notebooks with IDs and hierarchy. Use to find notebook_id for filtering. Returns tree structure showing parent/child relationships. Always list notebooks first before creating new ones to avoid duplicates. Args: params: ListNotebooksInput containing: - response_format: 'markdown' or 'json' Returns: List of notebooks with their IDs and structure. """ try: notebooks = await _get_all_paginated( "folders", params={"fields": "id,title,parent_id"}, ) if not notebooks: return "No notebooks found." if params.response_format == ResponseFormat.JSON: return json.dumps(notebooks, indent=2) # Build tree structure for markdown def build_tree(parent_id: str = "", level: int = 0) -> list[str]: lines = [] for nb in notebooks: if nb.get("parent_id", "") == parent_id: indent = " " * level lines.append(f"{indent}- **{nb['title']}**") lines.append(f"{indent} ID: `{nb['id']}`") lines.extend(build_tree(nb["id"], level + 1)) return lines lines = ["# Joplin Notebooks", ""] lines.extend(build_tree()) return "\n".join(lines) except Exception as e: return _handle_error(e) @mcp.tool( name="joplin_create_notebook", annotations={ "title": "Create Joplin Notebook", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False, }, ) async def joplin_create_notebook(params: CreateNotebookInput) -> str: """ Create notebook or return existing. Checks for duplicates by title first. ⚠️ IMPORTANT: Always searches for existing notebook with same title before creating. Returns existing notebook ID if found to prevent duplicates. Args: params: CreateNotebookInput containing: - title: Notebook name - parent_id: Optional parent notebook for sub-notebook Returns: Notebook details with ID (existing or newly created). """ try: # First, check if notebook with same title already exists existing_notebooks = await _get_all_paginated( "folders", params={"fields": "id,title,parent_id"}, ) # Search for exact title match (case-insensitive) at the same parent level target_parent = params.parent_id or "" for nb in existing_notebooks: nb_parent = nb.get("parent_id", "") or "" if (nb.get("title", "").lower() == params.title.lower() and nb_parent == target_parent): return ( f"📁 Notebook **{nb['title']}** already exists " f"(ID: `{nb['id']}`). Using existing notebook." ) # No duplicate found, create new notebook data: dict[str, Any] = {"title": params.title} if params.parent_id: data["parent_id"] = params.parent_id notebook = await _make_api_request("folders", method="POST", json_data=data) return f"✅ Created notebook **{notebook['title']}** (ID: `{notebook['id']}`)" except Exception as e: return _handle_error(e) # ============================================================================= # Note Tools # ============================================================================= @mcp.tool( name="joplin_list_notes", annotations={ "title": "List Joplin Notes", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_list_notes(params: ListNotesInput) -> str: """ List notes with IDs, titles, dates. Filter by notebook_id, sort by date/title. Returns note metadata (not content). Use get_note for full content. Supports to-do status indicators in output. Args: params: ListNotesInput containing: - notebook_id: Filter by notebook (optional) - limit: Maximum notes to return (default 50) - order_by: Sort field (updated_time, created_time, title) - order_desc: Sort descending (default true) - response_format: 'markdown' or 'json' Returns: List of notes with titles, dates, and IDs. """ try: request_params = { "fields": "id,title,parent_id,updated_time,created_time,is_todo,todo_completed", "order_by": params.order_by.value, "order_dir": "DESC" if params.order_desc else "ASC", } if params.notebook_id: endpoint = f"folders/{params.notebook_id}/notes" else: endpoint = "notes" notes = await _get_all_paginated(endpoint, params=request_params, limit=params.limit) if not notes: return "No notes found." if params.response_format == ResponseFormat.JSON: return json.dumps(notes, indent=2) # Markdown format lines = ["# Joplin Notes", f"*Showing {len(notes)} notes*", ""] for note in notes: # Todo indicator if note.get("is_todo"): status = "✅" if note.get("todo_completed") else "⬜" lines.append(f"### {status} {note['title']}") else: lines.append(f"### {note['title']}") lines.append(f"- **ID**: `{note['id']}`") lines.append(f"- **Updated**: {_format_timestamp(note.get('updated_time'))}") lines.append("") return _truncate_response("\n".join(lines), len(notes)) except Exception as e: return _handle_error(e) @mcp.tool( name="joplin_get_note", annotations={ "title": "Get Joplin Note", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_get_note(params: GetNoteInput) -> str: """ Get note by ID with full Markdown content. Includes metadata and body. Use after list_notes or search_notes to retrieve full content. Set include_body=false for metadata only. Args: params: GetNoteInput containing: - note_id: The note ID (required) - include_body: Include full content (default true) - response_format: 'markdown' or 'json' Returns: Note details including content if requested. """ try: fields = "id,title,parent_id,updated_time,created_time,is_todo,todo_completed,source_url" if params.include_body: fields += ",body" note = await _make_api_request( f"notes/{params.note_id}", params={"fields": fields}, ) if params.response_format == ResponseFormat.JSON: return json.dumps(note, indent=2) # Markdown format lines = [f"# {note['title']}", ""] if note.get("is_todo"): status = "Completed ✅" if note.get("todo_completed") else "Pending ⬜" lines.append(f"**Status**: {status}") lines.extend([ f"- **ID**: `{note['id']}`", f"- **Notebook**: `{note.get('parent_id', 'Unknown')}`", f"- **Created**: {_format_timestamp(note.get('created_time'))}", f"- **Updated**: {_format_timestamp(note.get('updated_time'))}", ]) if note.get("source_url"): lines.append(f"- **Source**: {note['source_url']}") if params.include_body and note.get("body"): lines.extend(["", "---", "", note["body"]]) return "\n".join(lines) except Exception as e: return _handle_error(e) @mcp.tool( name="joplin_create_note", annotations={ "title": "Create Joplin Note", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False, }, ) async def joplin_create_note(params: CreateNoteInput) -> str: """ Create note with Markdown body, optional tags, to-do support. Specify notebook_id to target specific notebook (list notebooks first). Tags are created automatically if they don't exist. Args: params: CreateNoteInput containing: - title: Note title (required) - body: Markdown content (default empty) - notebook_id: Target notebook ID (uses default if not set) - tags: List of tag names (auto-created if needed) - is_todo: Create as to-do item (default false) Returns: Created note details with ID. """ try: data: dict[str, Any] = { "title": params.title, "body": params.body, } if params.notebook_id: data["parent_id"] = params.notebook_id if params.is_todo: data["is_todo"] = 1 note = await _make_api_request("notes", method="POST", json_data=data) # Add tags if specified if params.tags: for tag_name in params.tags: try: # Search for existing tag tags = await _make_api_request( "search", params={"query": tag_name, "type": "tag"}, ) tag_id = None if isinstance(tags, dict) and tags.get("items"): for t in tags["items"]: if t.get("title", "").lower() == tag_name.lower(): tag_id = t["id"] break # Create tag if not found if not tag_id: new_tag = await _make_api_request( "tags", method="POST", json_data={"title": tag_name}, ) tag_id = new_tag["id"] # Add tag to note await _make_api_request( f"tags/{tag_id}/notes", method="POST", json_data={"id": note["id"]}, ) except Exception: pass # Continue even if tagging fails note_type = "to-do" if params.is_todo else "note" return f"✅ Created {note_type} **{note['title']}** (ID: `{note['id']}`)" except Exception as e: return _handle_error(e) @mcp.tool( name="joplin_update_note", annotations={ "title": "Update Joplin Note", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_update_note(params: UpdateNoteInput) -> str: """ Update note title, body, or move to different notebook. Partial updates OK. Only provided fields are changed. Can convert to/from to-do, mark complete, or move between notebooks. Args: params: UpdateNoteInput containing: - note_id: The note ID to update (required) - title: New title (optional) - body: New Markdown content (optional) - notebook_id: Move to different notebook (optional) - is_todo: Convert to/from to-do (optional) - todo_completed: Mark to-do complete (optional) Returns: Confirmation that the note was updated. """ try: data: dict[str, Any] = {} if params.title is not None: data["title"] = params.title if params.body is not None: data["body"] = params.body if params.notebook_id is not None: data["parent_id"] = params.notebook_id if params.is_todo is not None: data["is_todo"] = 1 if params.is_todo else 0 if params.todo_completed is not None: data["todo_completed"] = int(datetime.now().timestamp() * 1000) if params.todo_completed else 0 if not data: return "Error: No fields to update. Provide at least one field to change." note = await _make_api_request( f"notes/{params.note_id}", method="PUT", json_data=data, ) title = params.title or note.get("title", "Note") return f"✅ Updated note **{title}** (ID: `{params.note_id}`)" except Exception as e: return _handle_error(e) @mcp.tool( name="joplin_delete_note", annotations={ "title": "Delete Joplin Note", "readOnlyHint": False, "destructiveHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_delete_note(params: DeleteNoteInput) -> str: """ ⚠️ Delete note permanently. Cannot be undone. Args: params: DeleteNoteInput containing: - note_id: The note ID to delete (required) Returns: Confirmation that the note was deleted. """ try: await _make_api_request(f"notes/{params.note_id}", method="DELETE") return f"🗑️ Deleted note (ID: `{params.note_id}`)" except Exception as e: return _handle_error(e) @mcp.tool( name="joplin_search_notes", annotations={ "title": "Search Joplin Notes", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_search_notes(params: SearchNotesInput) -> str: """ Search notes. Supports title:, body:, tag:, notebook:, type: prefixes. Examples: "tag:work type:todo", "title:meeting", "notebook:Projects". Also supports created:, updated: date filters and iscompleted:1/0. Args: params: SearchNotesInput containing: - query: Search query with optional prefixes - limit: Maximum results (default 20) - response_format: 'markdown' or 'json' Returns: Matching notes with their details. """ try: result = await _make_api_request( "search", params={ "query": params.query, "type": "note", "fields": "id,title,parent_id,updated_time,is_todo,todo_completed", "limit": params.limit, }, ) items = result.get("items", []) if isinstance(result, dict) else result if not items: return f"No notes found matching '{params.query}'." if params.response_format == ResponseFormat.JSON: return json.dumps(items, indent=2) # Markdown format lines = [ f"# Search Results: '{params.query}'", f"*Found {len(items)} notes*", "", ] for note in items: if note.get("is_todo"): status = "✅" if note.get("todo_completed") else "⬜" lines.append(f"### {status} {note['title']}") else: lines.append(f"### {note['title']}") lines.append(f"- **ID**: `{note['id']}`") lines.append(f"- **Updated**: {_format_timestamp(note.get('updated_time'))}") lines.append("") return _truncate_response("\n".join(lines), len(items)) except Exception as e: return _handle_error(e) # ============================================================================= # Tag Tools # ============================================================================= @mcp.tool( name="joplin_list_tags", annotations={ "title": "List Joplin Tags", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_list_tags(params: ListTagsInput) -> str: """ List all tags with IDs. Use for tag: search prefix or tag_note operations. Returns alphabetically sorted list. Tags are reusable across notes. Args: params: ListTagsInput containing: - response_format: 'markdown' or 'json' Returns: List of tags with IDs. """ try: tags = await _get_all_paginated( "tags", params={"fields": "id,title"}, ) if not tags: return "No tags found." if params.response_format == ResponseFormat.JSON: return json.dumps(tags, indent=2) # Markdown format lines = ["# Joplin Tags", ""] for tag in sorted(tags, key=lambda t: t.get("title", "").lower()): lines.append(f"- **{tag['title']}** (ID: `{tag['id']}`)") return "\n".join(lines) except Exception as e: return _handle_error(e) @mcp.tool( name="joplin_tag_note", annotations={ "title": "Tag Joplin Note", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def joplin_tag_note(params: TagNoteInput) -> str: """ Add tag to note. Creates tag automatically if it doesn't exist. Idempotent: adding existing tag has no effect. Case-insensitive matching. Args: params: TagNoteInput containing: - note_id: The note ID to tag (required) - tag: Tag name to add (auto-created if needed) Returns: Confirmation that the tag was added. """ try: # Search for existing tag tags = await _make_api_request( "search", params={"query": params.tag, "type": "tag"}, ) tag_id = None if isinstance(tags, dict) and tags.get("items"): for t in tags["items"]: if t.get("title", "").lower() == params.tag.lower(): tag_id = t["id"] break # Create tag if not found if not tag_id: new_tag = await _make_api_request( "tags", method="POST", json_data={"title": params.tag}, ) tag_id = new_tag["id"] # Add tag to note await _make_api_request( f"tags/{tag_id}/notes", method="POST", json_data={"id": params.note_id}, ) return f"✅ Added tag **{params.tag}** to note `{params.note_id}`" except Exception as e: return _handle_error(e) # ============================================================================= # Main Entry Point # ============================================================================= if __name__ == "__main__": mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/IAMSamuelRodda/joplin-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server