llm-tools/mcps/selenium_mcp/selenium_mcp_server.py
Gregory Gauthier 83ec950df7 first commit
2026-04-08 12:11:04 +01:00

1076 lines
36 KiB
Python

"""
selenium_mcp - MCP server for browser automation via Selenium WebDriver.
Provides tools for navigating, clicking, filling forms, taking screenshots,
extracting content, and managing browser sessions over Streamable HTTP transport.
"""
import base64
import json
import logging
import os
import sys
import time
import uuid
from contextlib import asynccontextmanager
from enum import Enum
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, ConfigDict, Field, field_validator
from selenium import webdriver
from selenium.common.exceptions import (
ElementClickInterceptedException,
ElementNotInteractableException,
NoSuchElementException,
StaleElementReferenceException,
TimeoutException,
WebDriverException,
)
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select, WebDriverWait
# ---------------------------------------------------------------------------
# Logging & Output Redirection
# ---------------------------------------------------------------------------
LOG_FILE = os.environ.get("SELENIUM_MCP_LOG_FILE", "selenium_mcp.log")
def setup_redirection():
"""Reduces noise by suppressing stderr and logging to a file."""
# Suppress stderr
devnull = open(os.devnull, "w")
os.dup2(devnull.fileno(), sys.stderr.fileno())
# Configure logging to write to the log file directly
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
filename=LOG_FILE,
filemode="a"
)
setup_redirection()
logger = logging.getLogger("selenium_mcp")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEFAULT_TIMEOUT: int = 10
MAX_PAGE_SOURCE_LENGTH: int = 200_000
MAX_TEXT_LENGTH: int = 50_000
SCREENSHOT_DIR: str = os.environ.get("SELENIUM_SCREENSHOT_DIR", "/tmp/selenium_screenshots")
CHROME_BINARY: Optional[str] = os.environ.get("CHROME_BINARY", None)
CHROMEDRIVER_PATH: Optional[str] = os.environ.get("CHROMEDRIVER_PATH", None)
HEADLESS: bool = os.environ.get("SELENIUM_HEADLESS", "true").lower() in ("true", "1", "yes")
WINDOW_WIDTH: int = int(os.environ.get("SELENIUM_WINDOW_WIDTH", "1920"))
WINDOW_HEIGHT: int = int(os.environ.get("SELENIUM_WINDOW_HEIGHT", "1080"))
HOST: str = os.environ.get("SELENIUM_MCP_HOST", "0.0.0.0")
PORT: int = int(os.environ.get("SELENIUM_MCP_PORT", "8888"))
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class LocatorStrategy(str, Enum):
"""Supported element locator strategies."""
CSS = "css"
XPATH = "xpath"
ID = "id"
NAME = "name"
TAG_NAME = "tag_name"
CLASS_NAME = "class_name"
LINK_TEXT = "link_text"
PARTIAL_LINK_TEXT = "partial_link_text"
LOCATOR_MAP: Dict[str, str] = {
"css": By.CSS_SELECTOR,
"xpath": By.XPATH,
"id": By.ID,
"name": By.NAME,
"tag_name": By.TAG_NAME,
"class_name": By.CLASS_NAME,
"link_text": By.LINK_TEXT,
"partial_link_text": By.PARTIAL_LINK_TEXT,
}
class ResponseFormat(str, Enum):
"""Output format for tool responses."""
MARKDOWN = "markdown"
JSON = "json"
# ---------------------------------------------------------------------------
# Session manager — one browser per session_id
# ---------------------------------------------------------------------------
class BrowserSessionManager:
"""Manages multiple browser sessions identified by session_id."""
def __init__(self) -> None:
self._sessions: Dict[str, webdriver.Chrome] = {}
def _create_driver(self) -> webdriver.Chrome:
"""Create a new Chrome WebDriver instance."""
opts = ChromeOptions()
if HEADLESS:
opts.add_argument("--headless=new")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
opts.add_argument("--disable-gpu")
opts.add_argument(f"--window-size={WINDOW_WIDTH},{WINDOW_HEIGHT}")
opts.add_argument("--disable-extensions")
opts.add_argument("--disable-infobars")
opts.add_argument(
"--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
)
if CHROME_BINARY:
opts.binary_location = CHROME_BINARY
service_kwargs: Dict[str, Any] = {}
if CHROMEDRIVER_PATH:
service_kwargs["executable_path"] = CHROMEDRIVER_PATH
service = ChromeService(**service_kwargs)
driver = webdriver.Chrome(service=service, options=opts)
driver.set_page_load_timeout(30)
driver.implicitly_wait(2)
return driver
def get_or_create(self, session_id: Optional[str] = None) -> tuple[str, webdriver.Chrome]:
"""Return (session_id, driver). Creates a new session if needed."""
if session_id and session_id in self._sessions:
return session_id, self._sessions[session_id]
sid = session_id or str(uuid.uuid4())[:8]
driver = self._create_driver()
self._sessions[sid] = driver
logger.info("Created browser session %s", sid)
return sid, driver
def get(self, session_id: str) -> Optional[webdriver.Chrome]:
return self._sessions.get(session_id)
def close(self, session_id: str) -> bool:
driver = self._sessions.pop(session_id, None)
if driver:
try:
driver.quit()
except Exception:
pass
logger.info("Closed browser session %s", session_id)
return True
return False
def close_all(self) -> int:
count = len(self._sessions)
for sid in list(self._sessions):
self.close(sid)
return count
def list_sessions(self) -> List[Dict[str, str]]:
result = []
for sid, driver in self._sessions.items():
try:
url = driver.current_url
title = driver.title
except Exception:
url = "unknown"
title = "unknown"
result.append({"session_id": sid, "url": url, "title": title})
return result
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _resolve_by(strategy: LocatorStrategy) -> str:
return LOCATOR_MAP[strategy.value]
def _find_element(
driver: webdriver.Chrome,
by: LocatorStrategy,
selector: str,
timeout: int = DEFAULT_TIMEOUT,
) -> Any:
"""Wait for and return a single element."""
return WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((_resolve_by(by), selector))
)
def _find_elements(
driver: webdriver.Chrome,
by: LocatorStrategy,
selector: str,
timeout: int = DEFAULT_TIMEOUT,
) -> List[Any]:
"""Wait for and return multiple elements."""
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((_resolve_by(by), selector))
)
return driver.find_elements(_resolve_by(by), selector)
def _element_info(el: Any) -> Dict[str, Any]:
"""Extract useful info from a WebElement."""
return {
"tag": el.tag_name,
"text": (el.text or "")[:500],
"visible": el.is_displayed(),
"enabled": el.is_enabled(),
"selected": el.is_selected(),
"attributes": {
attr: el.get_attribute(attr)
for attr in ("id", "name", "class", "href", "src", "value", "type", "placeholder", "aria-label")
if el.get_attribute(attr)
},
}
def _handle_selenium_error(e: Exception) -> str:
"""Produce an actionable error message."""
if isinstance(e, NoSuchElementException):
return "Error: Element not found. Check your selector and locator strategy. Try using a broader CSS or XPath selector."
if isinstance(e, TimeoutException):
return "Error: Timed out waiting for element. The page may still be loading, or the selector doesn't match any element. Try increasing timeout or verify the selector."
if isinstance(e, ElementNotInteractableException):
return "Error: Element exists but is not interactable (may be hidden or overlapped). Try scrolling to the element first, or waiting for it to become visible."
if isinstance(e, ElementClickInterceptedException):
return "Error: Click was intercepted by another element. Try scrolling or closing overlays/modals first."
if isinstance(e, StaleElementReferenceException):
return "Error: Element reference is stale (DOM changed). Re-find the element before interacting."
if isinstance(e, WebDriverException):
return f"Error: WebDriver error — {str(e)[:300]}"
return f"Error: {type(e).__name__}{str(e)[:300]}"
def _truncate(text: str, limit: int = MAX_TEXT_LENGTH) -> str:
if len(text) <= limit:
return text
return text[:limit] + f"\n\n... [truncated — {len(text)} chars total]"
# ---------------------------------------------------------------------------
# Pydantic input models
# ---------------------------------------------------------------------------
class SessionInput(BaseModel):
"""Base model including optional session_id."""
model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
session_id: Optional[str] = Field(
default=None,
description="Browser session ID. Omit to auto-create a new session.",
max_length=50,
)
class NavigateInput(SessionInput):
"""Navigate to a URL."""
url: str = Field(..., description="Full URL to navigate to (e.g. 'https://example.com')", min_length=1, max_length=4096)
wait_for: Optional[str] = Field(
default=None,
description="Optional CSS selector to wait for after navigation.",
max_length=500,
)
timeout: int = Field(default=DEFAULT_TIMEOUT, description="Seconds to wait for page/element load", ge=1, le=120)
class ElementActionInput(SessionInput):
"""Input for actions on a single element (click, type, etc.)."""
by: LocatorStrategy = Field(default=LocatorStrategy.CSS, description="Locator strategy")
selector: str = Field(..., description="Element selector string", min_length=1, max_length=1000)
timeout: int = Field(default=DEFAULT_TIMEOUT, ge=1, le=120)
class TypeTextInput(ElementActionInput):
"""Type text into an element."""
text: str = Field(..., description="Text to type into the element", max_length=10000)
clear_first: bool = Field(default=True, description="Clear existing text before typing")
press_enter: bool = Field(default=False, description="Press Enter after typing")
class SelectOptionInput(ElementActionInput):
"""Select an option from a <select> element."""
value: Optional[str] = Field(default=None, description="Option value attribute", max_length=500)
visible_text: Optional[str] = Field(default=None, description="Visible text of the option", max_length=500)
index: Optional[int] = Field(default=None, description="Zero-based index of the option", ge=0)
@field_validator("index")
@classmethod
def at_least_one_selection(cls, v: Optional[int], info: Any) -> Optional[int]:
data = info.data
if v is None and data.get("value") is None and data.get("visible_text") is None:
raise ValueError("Provide at least one of: value, visible_text, or index")
return v
class FindElementsInput(SessionInput):
"""Find elements and extract info."""
by: LocatorStrategy = Field(default=LocatorStrategy.CSS, description="Locator strategy")
selector: str = Field(..., description="Element selector string", min_length=1, max_length=1000)
timeout: int = Field(default=DEFAULT_TIMEOUT, ge=1, le=120)
limit: int = Field(default=20, description="Max elements to return", ge=1, le=200)
class ScreenshotInput(SessionInput):
"""Take a screenshot."""
full_page: bool = Field(default=False, description="Capture full scrollable page (not just viewport)")
class ExecuteScriptInput(SessionInput):
"""Execute JavaScript in the browser."""
script: str = Field(..., description="JavaScript code to execute", min_length=1, max_length=50000)
args: Optional[List[Any]] = Field(default=None, description="Arguments to pass to the script")
class GetPageContentInput(SessionInput):
"""Get page source or extracted text."""
content_type: str = Field(
default="text",
description="'text' for visible text, 'html' for raw HTML source, 'links' for all hyperlinks",
pattern="^(text|html|links)$",
)
class WaitForInput(SessionInput):
"""Wait for a condition."""
by: LocatorStrategy = Field(default=LocatorStrategy.CSS, description="Locator strategy")
selector: str = Field(..., description="Element selector string", min_length=1, max_length=1000)
condition: str = Field(
default="present",
description="'present', 'visible', 'clickable', or 'gone'",
pattern="^(present|visible|clickable|gone)$",
)
timeout: int = Field(default=DEFAULT_TIMEOUT, ge=1, le=120)
class FormFillInput(SessionInput):
"""Fill multiple form fields at once."""
fields: List[Dict[str, str]] = Field(
...,
description="List of {by, selector, value} dicts. 'by' defaults to 'css'.",
min_length=1,
max_length=50,
)
submit_selector: Optional[str] = Field(
default=None,
description="Optional CSS selector of a submit button to click after filling",
max_length=500,
)
class ScrollInput(SessionInput):
"""Scroll the page."""
direction: str = Field(default="down", description="'up', 'down', 'top', or 'bottom'", pattern="^(up|down|top|bottom)$")
pixels: int = Field(default=500, description="Pixels to scroll (for up/down)", ge=0, le=50000)
class SessionIdInput(BaseModel):
"""Input that requires a session_id."""
model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
session_id: str = Field(..., description="Browser session ID to operate on", min_length=1, max_length=50)
# ---------------------------------------------------------------------------
# Lifespan — manage sessions
# ---------------------------------------------------------------------------
sessions = BrowserSessionManager()
@asynccontextmanager
async def app_lifespan(_app):
"""Initialise shared state available to all tools."""
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
yield {"sessions": sessions}
# ---------------------------------------------------------------------------
# MCP server
# ---------------------------------------------------------------------------
mcp = FastMCP(
"selenium_mcp",
host=HOST,
port=PORT,
stateless_http=False,
json_response=True,
lifespan=app_lifespan,
)
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@mcp.tool(
name="selenium_list_sessions",
structured_output=False,
annotations={
"title": "List Browser Sessions",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def selenium_list_sessions() -> str:
"""List all active browser sessions with their current URLs and titles.
Returns:
str: JSON array of session objects, each containing session_id, url, and title.
"""
return json.dumps(sessions.list_sessions(), indent=2)
@mcp.tool(
name="selenium_navigate",
structured_output=False,
annotations={
"title": "Navigate to URL",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_navigate(params: NavigateInput) -> str:
"""Navigate the browser to a URL. Creates a new session if session_id is omitted.
Optionally waits for a specific element to appear after page load.
Args:
params (NavigateInput): Contains url, optional session_id, wait_for selector, and timeout.
Returns:
str: JSON with session_id, url, title, and page load status.
Examples:
- Navigate to a site: params={url: "https://example.com"}
- Navigate and wait for content: params={url: "https://example.com", wait_for: "#main-content"}
"""
try:
sid, driver = sessions.get_or_create(params.session_id)
driver.get(params.url)
if params.wait_for:
WebDriverWait(driver, params.timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, params.wait_for))
)
return json.dumps({
"session_id": sid,
"url": driver.current_url,
"title": driver.title,
"status": "ok",
})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_click",
structured_output=False,
annotations={
"title": "Click Element",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def selenium_click(params: ElementActionInput) -> str:
"""Click an element on the page.
Waits for the element to be clickable, then clicks it.
Args:
params (ElementActionInput): Contains session_id, locator strategy (by), selector, and timeout.
Returns:
str: JSON confirming the click with the element's tag and resulting URL.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first to create one."
el = WebDriverWait(driver, params.timeout).until(
EC.element_to_be_clickable((_resolve_by(params.by), params.selector))
)
tag = el.tag_name
el.click()
return json.dumps({"status": "clicked", "tag": tag, "url": driver.current_url})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_type",
structured_output=False,
annotations={
"title": "Type Text Into Element",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def selenium_type(params: TypeTextInput) -> str:
"""Type text into an input or textarea element.
Optionally clears existing text first and/or presses Enter after typing.
Args:
params (TypeTextInput): Contains session_id, by, selector, text, clear_first, press_enter, timeout.
Returns:
str: JSON confirming the text was typed, with the element's current value.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
el = _find_element(driver, params.by, params.selector, params.timeout)
if params.clear_first:
el.clear()
el.send_keys(params.text)
if params.press_enter:
el.send_keys(Keys.RETURN)
return json.dumps({
"status": "typed",
"value": el.get_attribute("value") or "",
"url": driver.current_url,
})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_select",
structured_output=False,
annotations={
"title": "Select Dropdown Option",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_select(params: SelectOptionInput) -> str:
"""Select an option from a <select> dropdown by value, visible text, or index.
Args:
params (SelectOptionInput): Contains session_id, by, selector, value/visible_text/index, timeout.
Returns:
str: JSON with the selected option text and value.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
el = _find_element(driver, params.by, params.selector, params.timeout)
select = Select(el)
if params.value is not None:
select.select_by_value(params.value)
elif params.visible_text is not None:
select.select_by_visible_text(params.visible_text)
elif params.index is not None:
select.select_by_index(params.index)
selected = select.first_selected_option
return json.dumps({
"status": "selected",
"selected_text": selected.text,
"selected_value": selected.get_attribute("value") or "",
})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_find_elements",
structured_output=False,
annotations={
"title": "Find Elements",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_find_elements(params: FindElementsInput) -> str:
"""Find elements matching a selector and return their info (tag, text, attributes).
Args:
params (FindElementsInput): Contains session_id, by, selector, timeout, limit.
Returns:
str: JSON with count and array of element info objects.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
elements = _find_elements(driver, params.by, params.selector, params.timeout)
limited = elements[: params.limit]
return json.dumps({
"count": len(elements),
"returned": len(limited),
"elements": [_element_info(el) for el in limited],
}, indent=2)
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_screenshot",
structured_output=False,
annotations={
"title": "Take Screenshot",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_screenshot(params: ScreenshotInput) -> str:
"""Take a screenshot of the current page and return it as a base64-encoded PNG.
Args:
params (ScreenshotInput): Contains session_id and full_page flag.
Returns:
str: JSON with base64-encoded PNG image data, page URL, and dimensions.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
if params.full_page:
# Resize to full page height for a complete capture
total_height = driver.execute_script("return document.body.scrollHeight")
driver.set_window_size(WINDOW_WIDTH, min(total_height + 200, 16384))
time.sleep(0.5)
png_bytes = driver.get_screenshot_as_png()
b64 = base64.b64encode(png_bytes).decode("utf-8")
if params.full_page:
driver.set_window_size(WINDOW_WIDTH, WINDOW_HEIGHT)
return json.dumps({
"url": driver.current_url,
"title": driver.title,
"image_base64": b64,
"format": "png",
"width": driver.get_window_size()["width"],
"height": driver.get_window_size()["height"],
})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_get_page_content",
structured_output=False,
annotations={
"title": "Get Page Content",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_get_page_content(params: GetPageContentInput) -> str:
"""Extract content from the current page.
Supports three modes:
- 'text': visible text content
- 'html': raw HTML source
- 'links': all hyperlinks with text and href
Args:
params (GetPageContentInput): Contains session_id and content_type.
Returns:
str: JSON with extracted content, URL, and title.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
result: Dict[str, Any] = {"url": driver.current_url, "title": driver.title}
if params.content_type == "text":
body = driver.find_element(By.TAG_NAME, "body")
result["text"] = _truncate(body.text)
elif params.content_type == "html":
result["html"] = _truncate(driver.page_source, MAX_PAGE_SOURCE_LENGTH)
elif params.content_type == "links":
anchors = driver.find_elements(By.TAG_NAME, "a")
links = []
for a in anchors[:500]:
href = a.get_attribute("href")
if href:
links.append({"text": (a.text or "").strip()[:200], "href": href})
result["links"] = links
result["count"] = len(links)
return json.dumps(result, indent=2)
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_execute_script",
structured_output=False,
annotations={
"title": "Execute JavaScript",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def selenium_execute_script(params: ExecuteScriptInput) -> str:
"""Execute arbitrary JavaScript in the browser and return the result.
Args:
params (ExecuteScriptInput): Contains session_id, script, and optional args.
Returns:
str: JSON with the script return value (serialised).
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
result = driver.execute_script(params.script, *(params.args or []))
# Serialise the result safely
try:
serialised = json.dumps(result)
except (TypeError, ValueError):
serialised = json.dumps(str(result))
return json.dumps({"status": "ok", "result": json.loads(serialised)})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_wait_for",
structured_output=False,
annotations={
"title": "Wait For Element Condition",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_wait_for(params: WaitForInput) -> str:
"""Wait for an element condition: present, visible, clickable, or gone.
Args:
params (WaitForInput): Contains session_id, by, selector, condition, timeout.
Returns:
str: JSON confirming the condition was met, or an error if it timed out.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
by = _resolve_by(params.by)
locator = (by, params.selector)
wait = WebDriverWait(driver, params.timeout)
if params.condition == "present":
wait.until(EC.presence_of_element_located(locator))
elif params.condition == "visible":
wait.until(EC.visibility_of_element_located(locator))
elif params.condition == "clickable":
wait.until(EC.element_to_be_clickable(locator))
elif params.condition == "gone":
wait.until(EC.invisibility_of_element_located(locator))
return json.dumps({"status": "ok", "condition": params.condition, "selector": params.selector})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_fill_form",
structured_output=False,
annotations={
"title": "Fill Form Fields",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def selenium_fill_form(params: FormFillInput) -> str:
"""Fill multiple form fields in one call, then optionally click a submit button.
Each field dict should contain: {by, selector, value}. 'by' defaults to 'css'.
Args:
params (FormFillInput): Contains session_id, fields array, and optional submit_selector.
Returns:
str: JSON with the number of fields filled and whether submit was clicked.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
filled = 0
for field in params.fields:
by = LocatorStrategy(field.get("by", "css"))
selector = field["selector"]
value = field["value"]
el = _find_element(driver, by, selector)
el.clear()
el.send_keys(value)
filled += 1
submitted = False
if params.submit_selector:
btn = WebDriverWait(driver, DEFAULT_TIMEOUT).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, params.submit_selector))
)
btn.click()
submitted = True
return json.dumps({
"status": "ok",
"fields_filled": filled,
"submitted": submitted,
"url": driver.current_url,
})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_scroll",
structured_output=False,
annotations={
"title": "Scroll Page",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def selenium_scroll(params: ScrollInput) -> str:
"""Scroll the page up, down, to top, or to bottom.
Args:
params (ScrollInput): Contains session_id, direction, and pixels.
Returns:
str: JSON with new scroll position.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session. Use selenium_navigate first."
scripts = {
"down": f"window.scrollBy(0, {params.pixels});",
"up": f"window.scrollBy(0, -{params.pixels});",
"top": "window.scrollTo(0, 0);",
"bottom": "window.scrollTo(0, document.body.scrollHeight);",
}
driver.execute_script(scripts[params.direction])
scroll_y = driver.execute_script("return window.pageYOffset;")
return json.dumps({"status": "ok", "scroll_y": scroll_y})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_back",
structured_output=False,
annotations={
"title": "Go Back",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def selenium_back(params: SessionInput) -> str:
"""Navigate the browser back one page in history.
Args:
params (SessionInput): Contains session_id.
Returns:
str: JSON with the new URL and title.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session."
driver.back()
time.sleep(0.5)
return json.dumps({"url": driver.current_url, "title": driver.title})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_forward",
structured_output=False,
annotations={
"title": "Go Forward",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def selenium_forward(params: SessionInput) -> str:
"""Navigate the browser forward one page in history.
Args:
params (SessionInput): Contains session_id.
Returns:
str: JSON with the new URL and title.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session."
driver.forward()
time.sleep(0.5)
return json.dumps({"url": driver.current_url, "title": driver.title})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_close_session",
structured_output=False,
annotations={
"title": "Close Browser Session",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def selenium_close_session(params: SessionIdInput) -> str:
"""Close a browser session and quit the WebDriver.
Args:
params (SessionIdInput): Contains session_id.
Returns:
str: JSON confirming the session was closed.
"""
closed = sessions.close(params.session_id)
if closed:
return json.dumps({"status": "closed", "session_id": params.session_id})
return json.dumps({"status": "not_found", "session_id": params.session_id})
@mcp.tool(
name="selenium_hover",
structured_output=False,
annotations={
"title": "Hover Over Element",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_hover(params: ElementActionInput) -> str:
"""Hover over an element (move mouse to element).
Args:
params (ElementActionInput): Contains session_id, by, selector, timeout.
Returns:
str: JSON confirming the hover action.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session."
el = _find_element(driver, params.by, params.selector, params.timeout)
ActionChains(driver).move_to_element(el).perform()
return json.dumps({"status": "hovered", "tag": el.tag_name})
except Exception as e:
return _handle_selenium_error(e)
@mcp.tool(
name="selenium_get_attribute",
structured_output=False,
annotations={
"title": "Get Element Attribute",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def selenium_get_attribute(params: ElementActionInput) -> str:
"""Get all useful attributes and properties of an element.
Args:
params (ElementActionInput): Contains session_id, by, selector, timeout.
Returns:
str: JSON with element info including tag, text, visibility, and key attributes.
"""
try:
driver = sessions.get(params.session_id or "")
if not driver:
return "Error: No active session."
el = _find_element(driver, params.by, params.selector, params.timeout)
return json.dumps(_element_info(el), indent=2)
except Exception as e:
return _handle_selenium_error(e)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
logger.info("Starting selenium_mcp on %s:%d (headless=%s)", HOST, PORT, HEADLESS)
try:
mcp.run(transport="streamable-http")
finally:
sessions.close_all()
logger.info("All browser sessions closed.")
def main_stdio():
logger.info("Starting selenium_mcp via stdio (headless=%s)", HEADLESS)
try:
mcp.run(transport="stdio")
finally:
sessions.close_all()
logger.info("All browser sessions closed.")
if __name__ == "__main__":
main()