Skip to main content
Glama

Scenic MCP

server.ex13.4 kB
defmodule ScenicMcp.Server do use GenServer require Logger import ScenicMcp.Probes @tcp_port 9999 def start_link(opts) do port = Keyword.get(opts, :port, @tcp_port) GenServer.start_link(__MODULE__, port, name: __MODULE__) end def init(port) do case :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true]) do {:ok, listen_socket} -> Logger.info("ScenicMCP TCP server listening on port #{port}") {:ok, %{listen_socket: listen_socket, port: port}, {:continue, :accept}} {:error, reason} -> Logger.error("Failed to start TCP server on port #{port}: #{inspect(reason)}") {:stop, reason} end end def handle_continue(:accept, %{listen_socket: listen_socket} = state) do case :gen_tcp.accept(listen_socket) do {:ok, client} -> Logger.info("Client connected to ScenicMCP") {:noreply, Map.put(state, :client, client), {:continue, :loop}} {:error, reason} -> Logger.error("Failed to accept connection: #{inspect(reason)}") {:noreply, state, {:continue, :accept}} end end def handle_continue(:loop, %{client: client} = state) do case :gen_tcp.recv(client, 0) do {:ok, data} -> Logger.debug("Received: #{inspect(data)}") # Parse the incoming data response = handle_command(String.trim(data)) json_response = Jason.encode!(response) <> "\n" :gen_tcp.send(client, json_response) {:noreply, state, {:continue, :loop}} {:error, :closed} -> Logger.info("Client disconnected") {:noreply, Map.delete(state, :client), {:continue, :accept}} {:error, reason} -> Logger.error("Error receiving data: #{inspect(reason)}") :gen_tcp.close(client) {:noreply, Map.delete(state, :client), {:continue, :accept}} end end def handle_info(msg, state) do Logger.warning("#{__MODULE__} - Unexpected message: #{inspect(msg)}") {:noreply, state} end # Handle different commands defp handle_command(json_string) do case Jason.decode(json_string) do {:ok, %{"action" => "get_scenic_graph"} = command} -> handle_get_scenic_graph(command) {:ok, %{"action" => "send_keys"} = command} -> handle_send_keys(command) {:ok, %{"action" => "send_mouse_move"} = command} -> handle_mouse_move(command) {:ok, %{"action" => "send_mouse_click"} = command} -> handle_mouse_click(command) {:ok, %{"action" => "take_screenshot"} = command} -> handle_take_screenshot(command) {:ok, command} -> Logger.error "#{__MODULE__} recv'd an unknown command: #{inspect command}" %{error: "Unknown command", command: command} {:error, _} -> Logger.error "#{__MODULE__} recv'd some invalid JSON" %{error: "Invalid JSON"} end end defp handle_get_scenic_graph(_args) do try do # Get viewport state to access script table vp_state = viewport_state() case vp_state do %{script_table: script_table} when script_table != nil -> # Read all scripts from the ETS table scripts = :ets.tab2list(script_table) # Build a visual description of the scene visual_description = build_scene_description(scripts) %{ status: "ok", script_count: length(scripts), visual_description: visual_description, raw_scripts: Enum.map(scripts, fn {id, _compiled} -> id end) } _ -> %{error: "No script table found in viewport state"} end rescue e -> %{error: "Failed to get scenic graph", details: inspect(e)} end end defp build_scene_description(scripts) do scripts |> Enum.map(fn {id, _compiled_script} -> # Extract component name from ID component_name = case id do {name, _uid} when is_atom(name) -> Atom.to_string(name) name when is_atom(name) -> Atom.to_string(name) _ -> inspect(id) end %{ id: inspect(id), component: component_name } end) |> Enum.group_by(& &1.component) |> Enum.map(fn {component, items} -> "#{component} (#{length(items)} instances)" end) |> Enum.join(", ") end # Visual feedback functions commented out to avoid warnings # These will be re-enabled when visual feedback is implemented # defp get_visual_feedback(detail_level) do # # Implementation commented out # end # defp build_visual_description(scripts, detail_level) do # # Implementation commented out # end # defp decode_script(compiled) when is_binary(compiled) do # # Implementation commented out # end # defp analyze_operations(ops) do # # Implementation commented out # end # defp describe_color(r, g, b) do # # Implementation commented out # end # defp humanize_name(name) do # # Implementation commented out # end # Input handling defp handle_send_keys(%{"text" => text}) when is_binary(text) do try do driver_state = driver_state() String.graphemes(text) |> Enum.each(fn char -> # Use codepoint for text input instead of key events event = {:codepoint, {char, []}} Scenic.Driver.send_input(driver_state, event) Process.sleep(10) end) %{status: "ok", message: "Sent text: #{text}"} rescue _ -> %{error: "No driver found"} end end defp handle_send_keys(%{"key" => key} = params) when is_binary(key) do try do driver_state = driver_state() key_atom = normalize_key(key) modifiers = parse_modifiers(Map.get(params, "modifiers", [])) # Send key press Scenic.Driver.send_input(driver_state, {:key, {key_atom, 1, modifiers}}) Process.sleep(10) # Send key release Scenic.Driver.send_input(driver_state, {:key, {key_atom, 0, modifiers}}) %{status: "ok", message: "Sent key: #{key} with modifiers: #{inspect(modifiers)}"} rescue _ -> %{error: "No driver found"} end end defp handle_mouse_move(%{"x" => x, "y" => y}) do try do driver_state = driver_state() Scenic.Driver.send_input(driver_state, {:cursor_pos, {x, y}}) %{status: "ok", message: "Mouse moved to (#{x}, #{y})"} rescue _ -> %{error: "No driver found"} end end defp handle_mouse_click(%{"x" => x, "y" => y} = params) do try do driver_state = driver_state() button = parse_button(Map.get(params, "button", "left")) # Move to position first Scenic.Driver.send_input(driver_state, {:cursor_pos, {x, y}}) Process.sleep(10) # Press and release Scenic.Driver.send_input(driver_state, {:cursor_button, {button, 1, [], {x, y}}}) Process.sleep(10) Scenic.Driver.send_input(driver_state, {:cursor_button, {button, 0, [], {x, y}}}) %{status: "ok", message: "Clicked #{button} at (#{x}, #{y})"} rescue _ -> %{error: "No driver found"} end end defp normalize_key(key) do case String.downcase(key) do "enter" -> :key_enter "escape" -> :key_escape "tab" -> :key_tab "backspace" -> :key_backspace "delete" -> :key_delete "space" -> :key_space "up" -> :key_up "down" -> :key_down "left" -> :key_left "right" -> :key_right "home" -> :key_home "end" -> :key_end "page_up" -> :key_pageup "page_down" -> :key_pagedown "f1" -> :key_f1 "f2" -> :key_f2 "f3" -> :key_f3 "f4" -> :key_f4 "f5" -> :key_f5 "f6" -> :key_f6 "f7" -> :key_f7 "f8" -> :key_f8 "f9" -> :key_f9 "f10" -> :key_f10 "f11" -> :key_f11 "f12" -> :key_f12 other -> String.to_atom("key_" <> other) end end defp parse_modifiers(modifiers) when is_list(modifiers) do modifiers |> Enum.map(&String.to_atom/1) |> Enum.filter(&(&1 in [:shift, :ctrl, :alt, :cmd, :meta])) end defp parse_modifiers(_), do: [] defp parse_button(button) when is_binary(button) do case String.downcase(button) do "left" -> :btn_left "right" -> :btn_right "middle" -> :btn_middle _ -> :btn_left end end defp parse_button(_), do: :btn_left # Screenshot functionality defp handle_take_screenshot(params) do Logger.info("Handling screenshot request: #{inspect(params)}") # Get optional parameters format = Map.get(params, "format", "path") # "path" or "base64" filename = Map.get(params, "filename") # Generate filename if not provided path = if filename do if String.ends_with?(filename, ".png") do filename else filename <> ".png" end else timestamp = DateTime.utc_now() |> DateTime.to_string() |> String.replace(~r/[:\s]/, "_") "/tmp/scenic_screenshot_#{timestamp}.png" end # Find viewport and get driver viewport = viewport_pid_safe() if viewport do try do # Get the viewport's state to find drivers state = :sys.get_state(viewport, 5000) Logger.info("Getting driver_pids from viewport state...") # Get driver pids from the viewport state driver_pids = Map.get(state, :driver_pids, []) Logger.info("Final driver_pids: #{inspect(driver_pids)}") if length(driver_pids) > 0 do driver_pid = hd(driver_pids) Logger.info("Using driver_pid: #{inspect(driver_pid)}") # Take the screenshot using the driver Scenic.Driver.Local.screenshot(driver_pid, path) Logger.info("Screenshot saved to: #{path}") # Return response based on format case format do "base64" -> # Read file and encode to base64 case File.read(path) do {:ok, data} -> base64_data = Base.encode64(data) %{ status: "ok", format: "base64", data: base64_data, path: path, size: byte_size(data) } {:error, reason} -> %{error: "Failed to read screenshot file", reason: inspect(reason)} end _ -> # Just return the path %{ status: "ok", format: "path", path: path } end else Logger.error("No driver PIDs found in viewport state") # Try waiting a bit for drivers to register, then retry once Logger.info("Waiting 2 seconds for drivers to register...") :timer.sleep(2000) updated_state = :sys.get_state(viewport, 5000) updated_driver_pids = Map.get(updated_state, :driver_pids, []) Logger.info("After waiting, found driver_pids: #{inspect(updated_driver_pids)}") if length(updated_driver_pids) > 0 do driver_pid = hd(updated_driver_pids) Logger.info("Using driver_pid after wait: #{inspect(driver_pid)}") # Take the screenshot using the driver Scenic.Driver.Local.screenshot(driver_pid, path) Logger.info("Screenshot saved to: #{path}") # Return response based on format case format do "base64" -> # Read file and encode to base64 case File.read(path) do {:ok, data} -> base64_data = Base.encode64(data) %{ status: "ok", format: "base64", data: base64_data, path: path, size: byte_size(data) } {:error, reason} -> %{error: "Failed to read screenshot file", reason: inspect(reason)} end _ -> # Just return the path %{ status: "ok", format: "path", path: path } end else %{error: "No driver found in viewport after waiting", viewport_keys: Map.keys(updated_state)} end end rescue e -> error_msg = try do case e do %KeyError{key: key} -> "KeyError - missing key: #{inspect(key)}" _ -> "#{Exception.format(:error, e)}" end rescue _ -> "Unknown error occurred" end Logger.error("Screenshot failed: #{error_msg}") %{error: "Screenshot failed", details: error_msg} end else %{error: "No viewport found"} end end # Process management now handled by TypeScript MCP server directly # Find viewport - made public for the API function def find_scenic_driver do Logger.info("[DEBUG] Starting driver search...") # Look for the scenic driver process result = Process.list() |> Enum.find(fn pid -> case Process.info(pid, [:registered_name]) do [{:registered_name, :scenic}] -> true _ -> false end end) Logger.info("[DEBUG] Driver search result: #{inspect(result)}") result end end

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scenic-contrib/scenic_mcp_experimental'

If you have feedback or need assistance with the MCP directory API, please join our Discord server