llm-tools/mcps/dicom_mcp/dicom_mcp/tools/pixels.py
Gregory Gauthier 83ec950df7 first commit
2026-04-08 12:11:04 +01:00

512 lines
18 KiB
Python

"""Pixel analysis tools: read_pixels, compute_snr, render_image."""
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
import pydicom
from PIL import Image, ImageDraw, ImageFont
from pydicom.errors import InvalidDicomError
from mcp.types import ToolAnnotations
from dicom_mcp.server import mcp
from dicom_mcp.constants import ResponseFormat
from dicom_mcp.helpers.tags import _safe_get_tag, _format_markdown_table
from dicom_mcp.helpers.pixels import (
_get_pixel_array,
_extract_roi,
_compute_stats,
_apply_windowing,
)
@mcp.tool(
name="dicom_read_pixels",
annotations=ToolAnnotations(
title="Read DICOM Pixel Statistics",
readOnlyHint=True,
destructiveHint=False,
idempotentHint=True,
openWorldHint=False,
),
)
async def dicom_read_pixels(
file_path: str,
roi: Optional[List[int]] = None,
include_histogram: bool = False,
histogram_bins: int = 50,
response_format: ResponseFormat = ResponseFormat.MARKDOWN,
) -> str:
"""Extract pixel statistics from a DICOM file.
Reads pixel data and computes descriptive statistics (min, max, mean,
standard deviation, median, percentiles). Optionally restricts analysis
to a rectangular ROI.
Pixel values are rescaled using RescaleSlope and RescaleIntercept
when present (standard for Philips, common on Siemens/GE).
Args:
file_path: Path to the DICOM file
roi: Optional region of interest as [x, y, width, height] where
x,y is the top-left corner in pixel coordinates.
If omitted, statistics cover the entire image.
include_histogram: If True, include a binned histogram of pixel values
histogram_bins: Number of histogram bins (default 50)
response_format: Output format (markdown or json)
"""
try:
fp = Path(file_path)
if not fp.exists():
return f"Error: File not found: {file_path}"
ds = await asyncio.to_thread(pydicom.dcmread, fp)
if not hasattr(ds, "pixel_array"):
return f"Error: No pixel data in file: {file_path}"
pixels = _get_pixel_array(ds)
rows, cols = pixels.shape[:2]
image_info = {
"rows": rows,
"columns": cols,
"bits_allocated": int(getattr(ds, "BitsAllocated", 0)),
"bits_stored": int(getattr(ds, "BitsStored", 0)),
"rescale_slope": float(getattr(ds, "RescaleSlope", 1.0) or 1.0),
"rescale_intercept": float(getattr(ds, "RescaleIntercept", 0.0) or 0.0),
}
if pixels.ndim > 2:
return (
f"Error: Multi-frame image with shape {pixels.shape}. "
"This tool supports single-frame 2D images only."
)
roi_desc = "Full image"
if roi:
try:
pixels = _extract_roi(pixels, roi)
roi_desc = f"ROI [{roi[0]},{roi[1]}] {roi[2]}x{roi[3]}"
except ValueError as e:
return f"Error: {str(e)}"
stats = _compute_stats(pixels)
result = {
"file": fp.name,
"series_description": _safe_get_tag(ds, (0x0008, 0x103E)),
"image_info": image_info,
"region": roi_desc,
"statistics": stats,
}
if include_histogram:
counts, bin_edges = np.histogram(pixels.ravel(), bins=histogram_bins)
result["histogram"] = {
"bins": histogram_bins,
"counts": counts.tolist(),
"bin_edges": [round(float(e), 2) for e in bin_edges.tolist()],
}
if response_format == ResponseFormat.JSON:
return json.dumps(result, indent=2)
else:
output = [
f"# Pixel Statistics: {fp.name}\n",
f"**Series**: {result['series_description']}",
f"**Image size**: {rows} x {cols}",
f"**Region**: {roi_desc}",
f"**Rescale**: slope={image_info['rescale_slope']}, "
f"intercept={image_info['rescale_intercept']}\n",
"## Statistics\n",
]
headers = ["Metric", "Value"]
stat_rows = [
["Min", f"{stats['min']:.2f}"],
["Max", f"{stats['max']:.2f}"],
["Mean", f"{stats['mean']:.2f}"],
["Std Dev", f"{stats['std']:.2f}"],
["Median", f"{stats['median']:.2f}"],
["5th %ile", f"{stats['p5']:.2f}"],
["25th %ile", f"{stats['p25']:.2f}"],
["75th %ile", f"{stats['p75']:.2f}"],
["95th %ile", f"{stats['p95']:.2f}"],
["Pixel count", str(stats["pixel_count"])],
]
output.append(_format_markdown_table(headers, stat_rows))
if include_histogram and "histogram" in result:
output.append("\n## Histogram\n")
hist = result["histogram"]
max_count = max(hist["counts"]) if hist["counts"] else 1
for i, count in enumerate(hist["counts"]):
low = hist["bin_edges"][i]
high = hist["bin_edges"][i + 1]
bar_len = int(40 * count / max_count) if max_count > 0 else 0
bar = "#" * bar_len
output.append(f" {low:8.1f} - {high:8.1f} | {bar} ({count})")
return "\n".join(output)
except InvalidDicomError:
return f"Error: Not a valid DICOM file: {file_path}"
except Exception as e:
return f"Error reading pixel data: {str(e)}"
@mcp.tool(
name="dicom_compute_snr",
annotations=ToolAnnotations(
title="Compute SNR from ROIs",
readOnlyHint=True,
destructiveHint=False,
idempotentHint=True,
openWorldHint=False,
),
)
async def dicom_compute_snr(
file_path: str,
signal_roi: List[int],
noise_roi: List[int],
response_format: ResponseFormat = ResponseFormat.MARKDOWN,
) -> str:
"""Compute signal-to-noise ratio from two ROIs in a DICOM image.
Places a signal ROI (typically in tissue of interest) and a noise ROI
(typically in background air or a uniform region) and computes:
- SNR = mean(signal) / std(noise)
- Individual statistics for both ROIs
This is the single-image method SNR. For a more robust estimate,
use two identical acquisitions and compute SNR from the difference
image, but this gives a practical per-image metric.
Args:
file_path: Path to the DICOM file
signal_roi: Signal region as [x, y, width, height]
noise_roi: Noise/background region as [x, y, width, height]
response_format: Output format (markdown or json)
Tip: Use dicom_render_image first to visualise the image and identify
appropriate ROI coordinates, then use this tool to measure SNR.
"""
try:
fp = Path(file_path)
if not fp.exists():
return f"Error: File not found: {file_path}"
ds = await asyncio.to_thread(pydicom.dcmread, fp)
if not hasattr(ds, "pixel_array"):
return f"Error: No pixel data in file: {file_path}"
pixels = _get_pixel_array(ds)
if pixels.ndim > 2:
return (
f"Error: Multi-frame image with shape {pixels.shape}. "
"This tool supports single-frame 2D images only."
)
rows, cols = pixels.shape
try:
signal_pixels = _extract_roi(pixels, signal_roi)
except ValueError as e:
return f"Error in signal ROI: {str(e)}"
try:
noise_pixels = _extract_roi(pixels, noise_roi)
except ValueError as e:
return f"Error in noise ROI: {str(e)}"
signal_stats = _compute_stats(signal_pixels)
noise_stats = _compute_stats(noise_pixels)
noise_std = noise_stats["std"]
if noise_std == 0 or noise_std < 1e-10:
snr = float("inf")
snr_note = "Noise std is zero or near-zero; check ROI placement"
else:
snr = signal_stats["mean"] / noise_std
snr_note = None
result = {
"file": fp.name,
"series_description": _safe_get_tag(ds, (0x0008, 0x103E)),
"image_size": {"rows": rows, "columns": cols},
"signal_roi": {
"position": signal_roi,
"description": f"[{signal_roi[0]},{signal_roi[1]}] {signal_roi[2]}x{signal_roi[3]}",
"statistics": signal_stats,
},
"noise_roi": {
"position": noise_roi,
"description": f"[{noise_roi[0]},{noise_roi[1]}] {noise_roi[2]}x{noise_roi[3]}",
"statistics": noise_stats,
},
"snr": round(snr, 2) if snr != float("inf") else "infinite",
}
if snr_note:
result["snr_note"] = snr_note
if response_format == ResponseFormat.JSON:
return json.dumps(result, indent=2)
else:
output = [
f"# SNR Analysis: {fp.name}\n",
f"**Series**: {result['series_description']}",
f"**Image size**: {rows} x {cols}\n",
"## Signal ROI\n",
f"**Position**: {result['signal_roi']['description']}",
]
sig_headers = ["Metric", "Value"]
sig_rows = [
["Mean", f"{signal_stats['mean']:.2f}"],
["Std", f"{signal_stats['std']:.2f}"],
["Min", f"{signal_stats['min']:.2f}"],
["Max", f"{signal_stats['max']:.2f}"],
["Pixels", str(signal_stats["pixel_count"])],
]
output.append(_format_markdown_table(sig_headers, sig_rows))
output.append("\n## Noise ROI\n")
output.append(f"**Position**: {result['noise_roi']['description']}")
noise_headers = ["Metric", "Value"]
noise_rows = [
["Mean", f"{noise_stats['mean']:.2f}"],
["Std", f"{noise_stats['std']:.2f}"],
["Min", f"{noise_stats['min']:.2f}"],
["Max", f"{noise_stats['max']:.2f}"],
["Pixels", str(noise_stats["pixel_count"])],
]
output.append(_format_markdown_table(noise_headers, noise_rows))
output.append("\n## Result\n")
output.append(f"**SNR = {result['snr']}**")
output.append(
f"(mean_signal / std_noise = "
f"{signal_stats['mean']:.2f} / {noise_stats['std']:.2f})"
)
if snr_note:
output.append(f"\nWarning: {snr_note}")
return "\n".join(output)
except InvalidDicomError:
return f"Error: Not a valid DICOM file: {file_path}"
except Exception as e:
return f"Error computing SNR: {str(e)}"
@mcp.tool(
name="dicom_render_image",
annotations=ToolAnnotations(
title="Render DICOM Image to PNG",
readOnlyHint=False,
destructiveHint=False,
idempotentHint=True,
openWorldHint=False,
),
)
async def dicom_render_image(
file_path: str,
output_path: str,
window_center: Optional[float] = None,
window_width: Optional[float] = None,
auto_window: bool = False,
overlay_rois: Optional[List[Dict[str, Any]]] = None,
show_info: bool = True,
response_format: ResponseFormat = ResponseFormat.MARKDOWN,
) -> str:
"""Render a DICOM image to PNG with configurable windowing.
Applies a window/level transform and saves the result as an 8-bit
greyscale PNG. Optionally overlays ROI rectangles for visual
verification of ROI placement used in other tools.
Windowing priority:
1. Explicit window_center and window_width parameters
2. auto_window=True: uses 5th-95th percentile range
3. DICOM header WindowCenter/WindowWidth values
4. Fallback: full pixel range (min to max)
Args:
file_path: Path to the DICOM file
output_path: Path where the PNG will be saved
window_center: Optional manual window center value
window_width: Optional manual window width value
auto_window: If True, auto-calculate window from 5th-95th percentile
overlay_rois: Optional list of ROI overlays. Each dict has:
- roi: [x, y, width, height]
- label: Optional text label (e.g. "Signal", "Noise")
- color: Optional colour name ("red", "green", "blue",
"yellow", "cyan", "magenta", "white"; default "red")
show_info: If True, burn in series description and window values
response_format: Output format (markdown or json)
"""
try:
fp = Path(file_path)
if not fp.exists():
return f"Error: File not found: {file_path}"
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
ds = await asyncio.to_thread(pydicom.dcmread, fp)
if not hasattr(ds, "pixel_array"):
return f"Error: No pixel data in file: {file_path}"
pixels = _get_pixel_array(ds)
if pixels.ndim > 2:
return (
f"Error: Multi-frame image with shape {pixels.shape}. "
"This tool supports single-frame 2D images only."
)
rows, cols = pixels.shape
# --- Determine windowing ---
wc, ww = None, None
window_source = "none"
if window_center is not None and window_width is not None:
wc, ww = float(window_center), float(window_width)
window_source = "manual"
elif auto_window:
p5 = float(np.percentile(pixels, 5))
p95 = float(np.percentile(pixels, 95))
ww = p95 - p5
wc = p5 + ww / 2.0
window_source = "auto (p5-p95)"
else:
try:
header_wc = ds.get((0x0028, 0x1050))
header_ww = ds.get((0x0028, 0x1051))
if header_wc is not None and header_ww is not None:
hc = header_wc.value
hw = header_ww.value
if hasattr(hc, "__iter__") and not isinstance(hc, str):
hc = hc[0]
if hasattr(hw, "__iter__") and not isinstance(hw, str):
hw = hw[0]
wc, ww = float(hc), float(hw)
window_source = "DICOM header"
except Exception:
pass
if wc is None or ww is None:
pmin, pmax = float(np.min(pixels)), float(np.max(pixels))
ww = pmax - pmin if pmax > pmin else 1.0
wc = pmin + ww / 2.0
window_source = "full range"
display = _apply_windowing(pixels, wc, ww)
img = Image.fromarray(display, mode="L")
# --- Overlays ---
colour_map = {
"red": (255, 0, 0),
"green": (0, 255, 0),
"blue": (0, 100, 255),
"yellow": (255, 255, 0),
"cyan": (0, 255, 255),
"magenta": (255, 0, 255),
"white": (255, 255, 255),
}
if overlay_rois or show_info:
img = img.convert("RGB")
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("/System/Library/Fonts/Menlo.ttc", 12)
except (OSError, IOError):
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", 12
)
except (OSError, IOError):
font = ImageFont.load_default()
if overlay_rois:
for roi_overlay in overlay_rois:
roi = roi_overlay.get("roi")
if not roi or len(roi) != 4:
continue
label = roi_overlay.get("label", "")
colour_name = roi_overlay.get("color", "red").lower()
colour = colour_map.get(colour_name, (255, 0, 0))
x, y, w, h = int(roi[0]), int(roi[1]), int(roi[2]), int(roi[3])
for offset in range(2):
draw.rectangle(
[
x + offset,
y + offset,
x + w - 1 - offset,
y + h - 1 - offset,
],
outline=colour,
)
if label:
draw.text(
(x, max(0, y - 15)),
label,
fill=colour,
font=font,
)
if show_info:
series_desc = _safe_get_tag(ds, (0x0008, 0x103E))
info_text = f"{series_desc} WC:{wc:.0f} WW:{ww:.0f} ({window_source})"
bbox = draw.textbbox((0, 0), info_text, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
draw.rectangle(
[0, rows - text_h - 6, text_w + 8, rows],
fill=(0, 0, 0),
)
draw.text(
(4, rows - text_h - 3),
info_text,
fill=(255, 255, 255),
font=font,
)
await asyncio.to_thread(img.save, str(out), "PNG")
result = {
"file": fp.name,
"output_path": str(out),
"series_description": _safe_get_tag(ds, (0x0008, 0x103E)),
"image_size": {"rows": rows, "columns": cols},
"windowing": {
"center": round(wc, 2),
"width": round(ww, 2),
"source": window_source,
},
"overlays": len(overlay_rois) if overlay_rois else 0,
}
if response_format == ResponseFormat.JSON:
return json.dumps(result, indent=2)
else:
output = [
f"# Rendered Image: {fp.name}\n",
f"**Series**: {result['series_description']}",
f"**Image size**: {rows} x {cols}",
f"**Windowing**: WC={wc:.1f}, WW={ww:.1f} ({window_source})",
f"**Output**: `{out}`",
]
if overlay_rois:
output.append(f"**ROI overlays**: {len(overlay_rois)}")
return "\n".join(output)
except InvalidDicomError:
return f"Error: Not a valid DICOM file: {file_path}"
except Exception as e:
return f"Error rendering image: {str(e)}"