llm-tools/mcps/dicom_mcp/test_dicom_mcp.py
Gregory Gauthier 83ec950df7 first commit
2026-04-08 12:11:04 +01:00

1884 lines
71 KiB
Python

#!/usr/bin/env python3
"""
Pytest suite for DICOM MCP Server
Run with: pytest test_dicom_mcp.py -v
"""
import json
import sys
from pathlib import Path
import numpy as np
import pydicom
import pytest
from PIL import Image
from pydicom.dataset import FileMetaDataset
from pydicom.sequence import Sequence as DicomSequence
from pydicom.uid import (
ExplicitVRLittleEndian,
MRImageStorage,
SegmentationStorage,
UID,
generate_uid,
)
# ---------------------------------------------------------------------------
# Shared DICOM factory helpers
# ---------------------------------------------------------------------------
def _make_base_dataset(
*,
sop_class=MRImageStorage,
sop_instance_uid=None,
study_instance_uid=None,
series_instance_uid=None,
rows=4,
columns=4,
bits_stored=12,
pixel_data=None,
**attrs,
):
"""Create a minimal valid DICOM Dataset with correct typing.
Handles boilerplate: FileMetaDataset, transfer syntax, pixel encoding,
and UID generation. Pass additional DICOM attributes as keyword
arguments (e.g. ``Modality="MR"``, ``Manufacturer="SIEMENS"``).
Returns an in-memory Dataset (not saved to disk).
"""
ds = pydicom.Dataset()
ds.file_meta = FileMetaDataset()
ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
sop_uid = UID(sop_instance_uid) if sop_instance_uid else generate_uid()
ds.file_meta.MediaStorageSOPClassUID = sop_class
ds.file_meta.MediaStorageSOPInstanceUID = sop_uid
ds.SOPClassUID = sop_class
ds.SOPInstanceUID = sop_uid
ds.StudyInstanceUID = (
UID(study_instance_uid) if study_instance_uid else generate_uid()
)
ds.SeriesInstanceUID = (
UID(series_instance_uid) if series_instance_uid else generate_uid()
)
high_bit = bits_stored - 1
ds.Rows = rows
ds.Columns = columns
ds.BitsAllocated = 16
ds.BitsStored = bits_stored
ds.HighBit = high_bit
ds.PixelRepresentation = 0
ds.SamplesPerPixel = 1
ds.PhotometricInterpretation = "MONOCHROME2"
if pixel_data is not None:
ds.PixelData = pixel_data.astype(np.uint16).tobytes()
else:
ds.PixelData = np.zeros((rows, columns), dtype=np.uint16).tobytes()
for attr, value in attrs.items():
setattr(ds, attr, value)
return ds
def _save_dicom(ds, file_path):
"""Save a Dataset to disk with DICOM file format enforcement."""
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
ds.save_as(str(file_path), enforce_file_format=True)
return file_path
def _make_pii_dataset():
"""Create a Dataset with known patient PII fields (shared by PII test classes)."""
return _make_base_dataset(
Modality="MR",
Manufacturer="SIEMENS",
SeriesDescription="T1_VIBE_Dixon",
SeriesNumber=1,
PatientName="Doe^Jane",
PatientID="PII-TEST-001",
PatientBirthDate="19850315",
PatientSex="F",
)
# ---------------------------------------------------------------------------
# Module-scoped fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def dicom_mcp_module():
"""Import and return the dicom_mcp module."""
# Add the current directory to the path if needed
current_dir = Path(__file__).parent
if str(current_dir) not in sys.path:
sys.path.insert(0, str(current_dir))
import dicom_mcp
return dicom_mcp
# ---------------------------------------------------------------------------
# Test classes
# ---------------------------------------------------------------------------
class TestImports:
"""Test that all required modules can be imported."""
def test_mcp_import(self):
"""Test that mcp can be imported."""
import mcp # local: validates dependency is installed
assert mcp is not None
def test_pydicom_import(self):
"""Test that pydicom can be imported."""
assert pydicom is not None
def test_pydantic_import(self):
"""Test that pydantic can be imported."""
import pydantic # local: validates dependency is installed
assert pydantic is not None
def test_fastmcp_import(self):
"""Test that FastMCP can be imported."""
from mcp.server.fastmcp import FastMCP # local: validates submodule path
assert FastMCP is not None
class TestServerModule:
"""Test the DICOM MCP server module."""
def test_module_import(self, dicom_mcp_module):
"""Test that dicom_mcp module can be imported."""
assert dicom_mcp_module is not None
def test_mcp_instance_exists(self, dicom_mcp_module):
"""Test that mcp instance exists in module."""
assert hasattr(dicom_mcp_module, "mcp")
def test_mcp_instance_name(self, dicom_mcp_module):
"""Test that MCP instance has correct name."""
assert dicom_mcp_module.mcp.name == "dicom_mcp"
class TestToolRegistration:
"""Test that all tools are properly registered."""
EXPECTED_TOOLS = [
"dicom_list_files",
"dicom_get_metadata",
"dicom_compare_headers",
"dicom_find_dixon_series",
"dicom_validate_sequence",
"dicom_analyze_series",
"dicom_summarize_directory",
"dicom_query",
"dicom_search",
"dicom_query_philips_private",
"dicom_read_pixels",
"dicom_compute_snr",
"dicom_render_image",
"dicom_dump_tree",
"dicom_compare_uids",
"dicom_verify_segmentations",
"dicom_analyze_ti",
]
def test_all_tools_exist(self, dicom_mcp_module):
"""Test that all expected tool functions exist."""
for tool_name in self.EXPECTED_TOOLS:
assert hasattr(
dicom_mcp_module, tool_name
), f"Tool {tool_name} not found in module"
def test_tool_count(self, dicom_mcp_module):
"""Test that correct number of tools are registered."""
registered_count = sum(
1 for tool in self.EXPECTED_TOOLS if hasattr(dicom_mcp_module, tool)
)
assert registered_count == len(
self.EXPECTED_TOOLS
), f"Expected {len(self.EXPECTED_TOOLS)} tools, found {registered_count}"
@pytest.mark.parametrize("tool_name", EXPECTED_TOOLS)
def test_tool_is_callable(self, dicom_mcp_module, tool_name):
"""Test that each tool is callable."""
tool_func = getattr(dicom_mcp_module, tool_name)
assert callable(tool_func), f"{tool_name} is not callable"
class TestHelperFunctions:
"""Test helper functions."""
def test_safe_get_tag_exists(self, dicom_mcp_module):
"""Test that _safe_get_tag helper exists."""
assert hasattr(dicom_mcp_module, "_safe_get_tag")
def test_identify_sequence_type_exists(self, dicom_mcp_module):
"""Test that _identify_sequence_type helper exists."""
assert hasattr(dicom_mcp_module, "_identify_sequence_type")
def test_find_dicom_files_exists(self, dicom_mcp_module):
"""Test that _find_dicom_files helper exists."""
assert hasattr(dicom_mcp_module, "_find_dicom_files")
def test_format_markdown_table_exists(self, dicom_mcp_module):
"""Test that _format_markdown_table helper exists."""
assert hasattr(dicom_mcp_module, "_format_markdown_table")
def test_resolve_tag_exists(self, dicom_mcp_module):
"""Test that _resolve_tag helper exists."""
assert hasattr(dicom_mcp_module, "_resolve_tag")
def test_parse_filter_exists(self, dicom_mcp_module):
"""Test that _parse_filter helper exists."""
assert hasattr(dicom_mcp_module, "_parse_filter")
def test_apply_filter_exists(self, dicom_mcp_module):
"""Test that _apply_filter helper exists."""
assert hasattr(dicom_mcp_module, "_apply_filter")
def test_get_pixel_array_exists(self, dicom_mcp_module):
"""Test that _get_pixel_array helper exists."""
assert hasattr(dicom_mcp_module, "_get_pixel_array")
def test_extract_roi_exists(self, dicom_mcp_module):
"""Test that _extract_roi helper exists."""
assert hasattr(dicom_mcp_module, "_extract_roi")
def test_compute_stats_exists(self, dicom_mcp_module):
"""Test that _compute_stats helper exists."""
assert hasattr(dicom_mcp_module, "_compute_stats")
def test_apply_windowing_exists(self, dicom_mcp_module):
"""Test that _apply_windowing helper exists."""
assert hasattr(dicom_mcp_module, "_apply_windowing")
def test_resolve_philips_private_tag_exists(self, dicom_mcp_module):
"""Test that _resolve_philips_private_tag helper exists."""
assert hasattr(dicom_mcp_module, "_resolve_philips_private_tag")
def test_list_philips_private_creators_exists(self, dicom_mcp_module):
"""Test that _list_philips_private_creators helper exists."""
assert hasattr(dicom_mcp_module, "_list_philips_private_creators")
class TestEnums:
"""Test enum definitions."""
def test_response_format_enum_exists(self, dicom_mcp_module):
"""Test that ResponseFormat enum exists."""
assert hasattr(dicom_mcp_module, "ResponseFormat")
def test_response_format_values(self, dicom_mcp_module):
"""Test ResponseFormat has expected values."""
ResponseFormat = dicom_mcp_module.ResponseFormat
assert hasattr(ResponseFormat, "MARKDOWN")
assert hasattr(ResponseFormat, "JSON")
def test_sequence_type_enum_exists(self, dicom_mcp_module):
"""Test that SequenceType enum exists."""
assert hasattr(dicom_mcp_module, "SequenceType")
def test_sequence_type_values(self, dicom_mcp_module):
"""Test SequenceType has expected values including liver analysis types."""
SequenceType = dicom_mcp_module.SequenceType
assert hasattr(SequenceType, "DIXON")
assert hasattr(SequenceType, "T1_MAPPING")
assert hasattr(SequenceType, "MULTI_ECHO_GRE")
assert hasattr(SequenceType, "SPIN_ECHO_IR")
assert hasattr(SequenceType, "T1")
assert hasattr(SequenceType, "T2")
assert hasattr(SequenceType, "FLAIR")
assert hasattr(SequenceType, "DWI")
assert hasattr(SequenceType, "LOCALIZER")
assert hasattr(SequenceType, "UNKNOWN")
class TestConstants:
"""Test constant definitions."""
def test_common_tags_exists(self, dicom_mcp_module):
"""Test that COMMON_TAGS constant exists."""
assert hasattr(dicom_mcp_module, "COMMON_TAGS")
def test_common_tags_structure(self, dicom_mcp_module):
"""Test that COMMON_TAGS has expected structure."""
COMMON_TAGS = dicom_mcp_module.COMMON_TAGS
assert isinstance(COMMON_TAGS, dict)
expected_groups = [
"patient_info",
"study_info",
"series_info",
"image_info",
"acquisition",
"manufacturer",
]
for group in expected_groups:
assert group in COMMON_TAGS, f"Missing tag group: {group}"
assert isinstance(COMMON_TAGS[group], list)
class TestNewImports:
"""Test that new pixel tool dependencies can be imported."""
def test_numpy_import(self):
"""Test that numpy can be imported."""
assert np is not None
def test_pillow_import(self):
"""Test that Pillow can be imported."""
from PIL import ImageDraw, ImageFont
assert Image is not None
assert ImageDraw is not None
assert ImageFont is not None
class TestExtractRoi:
"""Test _extract_roi with synthetic pixel arrays."""
def test_valid_roi_centre(self, dicom_mcp_module):
"""Test extracting a valid central ROI."""
pixels = np.arange(100).reshape(10, 10).astype(np.float64)
roi = dicom_mcp_module._extract_roi(pixels, [2, 3, 4, 5])
assert roi.shape == (5, 4)
# Top-left corner of ROI: row=3, col=2 → value = 3*10 + 2 = 32
assert roi[0, 0] == 32.0
def test_valid_roi_full_image(self, dicom_mcp_module):
"""Test ROI covering the entire image."""
pixels = np.ones((8, 12), dtype=np.float64)
roi = dicom_mcp_module._extract_roi(pixels, [0, 0, 12, 8])
assert roi.shape == (8, 12)
def test_roi_out_of_bounds(self, dicom_mcp_module):
"""Test that out-of-bounds ROI raises ValueError."""
pixels = np.zeros((10, 10), dtype=np.float64)
with pytest.raises(ValueError, match="exceeds image bounds"):
dicom_mcp_module._extract_roi(pixels, [8, 8, 5, 5])
def test_roi_zero_width(self, dicom_mcp_module):
"""Test that zero-width ROI raises ValueError."""
pixels = np.zeros((10, 10), dtype=np.float64)
with pytest.raises(ValueError, match="must be positive"):
dicom_mcp_module._extract_roi(pixels, [0, 0, 0, 5])
def test_roi_negative_height(self, dicom_mcp_module):
"""Test that negative-dimension ROI raises ValueError."""
pixels = np.zeros((10, 10), dtype=np.float64)
with pytest.raises(ValueError, match="must be positive"):
dicom_mcp_module._extract_roi(pixels, [0, 0, 5, -3])
def test_roi_wrong_length(self, dicom_mcp_module):
"""Test that ROI with wrong number of elements raises ValueError."""
pixels = np.zeros((10, 10), dtype=np.float64)
with pytest.raises(ValueError, match="must be \\[x, y, width, height\\]"):
dicom_mcp_module._extract_roi(pixels, [0, 0, 5])
class TestComputeStats:
"""Test _compute_stats with known distributions."""
def test_uniform_array(self, dicom_mcp_module):
"""Test stats on a constant array."""
pixels = np.full((10, 10), 42.0)
stats = dicom_mcp_module._compute_stats(pixels)
assert stats["min"] == 42.0
assert stats["max"] == 42.0
assert stats["mean"] == 42.0
assert stats["std"] == 0.0
assert stats["median"] == 42.0
assert stats["pixel_count"] == 100
def test_known_sequence(self, dicom_mcp_module):
"""Test stats on a predictable sequence."""
# 0 through 99 inclusive
pixels = np.arange(100).reshape(10, 10).astype(np.float64)
stats = dicom_mcp_module._compute_stats(pixels)
assert stats["min"] == 0.0
assert stats["max"] == 99.0
assert stats["mean"] == pytest.approx(49.5)
assert stats["median"] == pytest.approx(49.5)
assert stats["pixel_count"] == 100
# std of uniform discrete dist
assert stats["std"] == pytest.approx(28.866, abs=0.01)
def test_all_expected_keys(self, dicom_mcp_module):
"""Test that all expected statistic keys are returned."""
pixels = np.ones((5, 5), dtype=np.float64)
stats = dicom_mcp_module._compute_stats(pixels)
expected_keys = {
"min",
"max",
"mean",
"std",
"median",
"p5",
"p25",
"p75",
"p95",
"pixel_count",
}
assert set(stats.keys()) == expected_keys
def test_percentile_ordering(self, dicom_mcp_module):
"""Test that percentiles are in expected order."""
pixels = np.random.RandomState(42).rand(50, 50).astype(np.float64)
stats = dicom_mcp_module._compute_stats(pixels)
assert stats["min"] <= stats["p5"]
assert stats["p5"] <= stats["p25"]
assert stats["p25"] <= stats["median"]
assert stats["median"] <= stats["p75"]
assert stats["p75"] <= stats["p95"]
assert stats["p95"] <= stats["max"]
class TestApplyWindowing:
"""Test _apply_windowing produces correct 8-bit output."""
def test_full_range(self, dicom_mcp_module):
"""Test windowing that maps full range to 0-255."""
pixels = np.array([[0.0, 50.0], [100.0, 200.0]])
result = dicom_mcp_module._apply_windowing(pixels, 100.0, 200.0)
assert result.dtype == np.uint8
assert result[0, 0] == 0 # at lower bound
assert result[1, 1] == 255 # at upper bound
assert result[0, 1] == 63 # 50/200 * 255 = 63.75, truncated to 63
def test_clipping_below(self, dicom_mcp_module):
"""Test that values below window are clipped to 0."""
pixels = np.array([[-100.0, -50.0]])
result = dicom_mcp_module._apply_windowing(pixels, 100.0, 100.0)
assert result[0, 0] == 0
assert result[0, 1] == 0
def test_clipping_above(self, dicom_mcp_module):
"""Test that values above window are clipped to 255."""
pixels = np.array([[500.0, 1000.0]])
result = dicom_mcp_module._apply_windowing(pixels, 100.0, 100.0)
assert result[0, 0] == 255
assert result[0, 1] == 255
def test_narrow_window(self, dicom_mcp_module):
"""Test a very narrow window (high contrast)."""
pixels = np.array([[99.0, 100.0, 101.0]])
result = dicom_mcp_module._apply_windowing(pixels, 100.0, 2.0)
assert result[0, 0] == 0
assert result[0, 1] == 127 # centre maps to 127.5, truncated to 127
assert result[0, 2] == 255
def test_output_shape_preserved(self, dicom_mcp_module):
"""Test that output has same shape as input."""
pixels = np.random.rand(64, 128).astype(np.float64) * 1000
result = dicom_mcp_module._apply_windowing(pixels, 500.0, 800.0)
assert result.shape == (64, 128)
class TestGetPixelArray:
"""Test _get_pixel_array with synthetic DICOM datasets."""
def _make_dataset(self, pixel_values, slope=1.0, intercept=0.0):
"""Create a minimal pydicom Dataset with pixel data."""
ds = _make_base_dataset(
rows=pixel_values.shape[0],
columns=pixel_values.shape[1],
bits_stored=16,
pixel_data=pixel_values,
RescaleSlope=slope,
RescaleIntercept=intercept,
)
return ds
def test_identity_rescale(self, dicom_mcp_module):
"""Test pixel extraction with slope=1, intercept=0."""
raw = np.array([[100, 200], [300, 400]], dtype=np.uint16)
ds = self._make_dataset(raw)
result = dicom_mcp_module._get_pixel_array(ds)
assert result.dtype == np.float64
np.testing.assert_array_equal(result, raw.astype(np.float64))
def test_slope_applied(self, dicom_mcp_module):
"""Test that RescaleSlope is applied correctly."""
raw = np.array([[10, 20]], dtype=np.uint16)
ds = self._make_dataset(raw, slope=2.5, intercept=0.0)
result = dicom_mcp_module._get_pixel_array(ds)
np.testing.assert_array_almost_equal(result, [[25.0, 50.0]])
def test_intercept_applied(self, dicom_mcp_module):
"""Test that RescaleIntercept is applied correctly."""
raw = np.array([[100, 200]], dtype=np.uint16)
ds = self._make_dataset(raw, slope=1.0, intercept=-100.0)
result = dicom_mcp_module._get_pixel_array(ds)
np.testing.assert_array_almost_equal(result, [[0.0, 100.0]])
def test_slope_and_intercept_combined(self, dicom_mcp_module):
"""Test slope and intercept applied together: value * slope + intercept."""
raw = np.array([[10, 20]], dtype=np.uint16)
ds = self._make_dataset(raw, slope=2.0, intercept=5.0)
result = dicom_mcp_module._get_pixel_array(ds)
# 10*2+5=25, 20*2+5=45
np.testing.assert_array_almost_equal(result, [[25.0, 45.0]])
class TestPixelToolsWithSyntheticDicom:
"""Functional tests for pixel tools using synthetic DICOM files on disk.
Creates temporary DICOM files with known pixel data to verify the
full tool pipeline including file I/O.
"""
@pytest.fixture
def synthetic_dicom(self, tmp_path):
"""Create a synthetic DICOM file with a gradient pattern.
The image is 64x64 with values 0-4095 (12-bit range), arranged
so that pixel intensity increases linearly left-to-right, with
a known signal region (centre) and noise region (top-left corner).
"""
# Gradient pattern: each row is the same, columns go 0..4095
row = np.linspace(0, 4095, 64).astype(np.uint16)
pixels = np.tile(row, (64, 1))
# Add a bright "signal" block in centre (rows 24-40, cols 24-40)
pixels[24:40, 24:40] = 3000
# Add a low "noise" block in top-left (rows 0-16, cols 0-16)
# with small random variation around a low mean
rng = np.random.RandomState(42)
pixels[0:16, 0:16] = (50 + rng.randint(0, 20, size=(16, 16))).astype(np.uint16)
ds = _make_base_dataset(
rows=64,
columns=64,
pixel_data=pixels,
FrameOfReferenceUID=generate_uid(),
Modality="MR",
Manufacturer="TestVendor",
SeriesDescription="LMS MOST Test",
SeriesNumber=1,
InstanceNumber=1,
ImageType=["ORIGINAL", "PRIMARY", "M_FFE"],
RescaleSlope=1.0,
RescaleIntercept=0.0,
WindowCenter=500,
WindowWidth=1000,
)
file_path = tmp_path / "test_image.dcm"
_save_dicom(ds, file_path)
return file_path, pixels
@pytest.mark.asyncio
async def test_read_pixels_full_image(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_read_pixels returns correct stats for full image."""
file_path, pixels = synthetic_dicom
result = await dicom_mcp_module.dicom_read_pixels(
file_path=str(file_path),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["image_info"]["rows"] == 64
assert data["image_info"]["columns"] == 64
assert data["region"] == "Full image"
assert data["statistics"]["pixel_count"] == 64 * 64
assert data["statistics"]["min"] >= 0
assert data["statistics"]["max"] <= 4095
@pytest.mark.asyncio
async def test_read_pixels_with_roi(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_read_pixels with ROI restricts to correct region."""
file_path, pixels = synthetic_dicom
# Signal block: cols 24-40, rows 24-40 → all 3000
result = await dicom_mcp_module.dicom_read_pixels(
file_path=str(file_path),
roi=[24, 24, 16, 16],
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["statistics"]["pixel_count"] == 16 * 16
assert data["statistics"]["mean"] == 3000.0
assert data["statistics"]["std"] == 0.0
@pytest.mark.asyncio
async def test_read_pixels_with_histogram(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_read_pixels includes histogram when requested."""
file_path, _ = synthetic_dicom
result = await dicom_mcp_module.dicom_read_pixels(
file_path=str(file_path),
include_histogram=True,
histogram_bins=10,
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert "histogram" in data
assert data["histogram"]["bins"] == 10
assert len(data["histogram"]["counts"]) == 10
assert len(data["histogram"]["bin_edges"]) == 11
assert sum(data["histogram"]["counts"]) == 64 * 64
@pytest.mark.asyncio
async def test_read_pixels_invalid_roi(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_read_pixels returns error for out-of-bounds ROI."""
file_path, _ = synthetic_dicom
result = await dicom_mcp_module.dicom_read_pixels(
file_path=str(file_path),
roi=[60, 60, 10, 10],
)
assert "Error" in result
assert "exceeds image bounds" in result
@pytest.mark.asyncio
async def test_read_pixels_file_not_found(self, dicom_mcp_module, tmp_path):
"""Test dicom_read_pixels returns error for missing file."""
result = await dicom_mcp_module.dicom_read_pixels(
file_path=str(tmp_path / "nonexistent.dcm"),
)
assert "Error" in result
assert "not found" in result
@pytest.mark.asyncio
async def test_read_pixels_markdown_format(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_read_pixels markdown output contains expected sections."""
file_path, _ = synthetic_dicom
result = await dicom_mcp_module.dicom_read_pixels(
file_path=str(file_path),
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "# Pixel Statistics" in result
assert "## Statistics" in result
assert "Mean" in result
assert "Std Dev" in result
@pytest.mark.asyncio
async def test_compute_snr_known_values(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_compute_snr with signal block and noise block."""
file_path, _ = synthetic_dicom
result = await dicom_mcp_module.dicom_compute_snr(
file_path=str(file_path),
signal_roi=[24, 24, 16, 16], # bright block, mean=3000, std=0
noise_roi=[0, 0, 16, 16], # noisy low block
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["signal_roi"]["statistics"]["mean"] == 3000.0
assert data["noise_roi"]["statistics"]["mean"] < 100.0
assert data["noise_roi"]["statistics"]["std"] > 0
# SNR should be high (3000 / small_std)
assert isinstance(data["snr"], (int, float))
assert data["snr"] > 100
@pytest.mark.asyncio
async def test_compute_snr_zero_noise(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_compute_snr handles zero noise std gracefully."""
file_path, _ = synthetic_dicom
# Use the signal block as both regions (std=0 in constant block)
result = await dicom_mcp_module.dicom_compute_snr(
file_path=str(file_path),
signal_roi=[24, 24, 8, 8],
noise_roi=[28, 28, 8, 8], # also in the constant block
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["snr"] == "infinite"
assert "snr_note" in data
@pytest.mark.asyncio
async def test_compute_snr_invalid_roi(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_compute_snr returns error for bad ROI."""
file_path, _ = synthetic_dicom
result = await dicom_mcp_module.dicom_compute_snr(
file_path=str(file_path),
signal_roi=[0, 0, 10, 10],
noise_roi=[60, 60, 10, 10], # out of bounds
)
assert "Error in noise ROI" in result
@pytest.mark.asyncio
async def test_compute_snr_markdown_format(self, dicom_mcp_module, synthetic_dicom):
"""Test dicom_compute_snr markdown output."""
file_path, _ = synthetic_dicom
result = await dicom_mcp_module.dicom_compute_snr(
file_path=str(file_path),
signal_roi=[24, 24, 16, 16],
noise_roi=[0, 0, 16, 16],
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "# SNR Analysis" in result
assert "## Signal ROI" in result
assert "## Noise ROI" in result
assert "## Result" in result
assert "SNR =" in result
@pytest.mark.asyncio
async def test_render_image_default_windowing(
self, dicom_mcp_module, synthetic_dicom, tmp_path
):
"""Test dicom_render_image creates a PNG with DICOM header windowing."""
file_path, _ = synthetic_dicom
output = tmp_path / "rendered.png"
result = await dicom_mcp_module.dicom_render_image(
file_path=str(file_path),
output_path=str(output),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert output.exists()
assert data["windowing"]["source"] == "DICOM header"
assert data["windowing"]["center"] == 500.0
assert data["windowing"]["width"] == 1000.0
# Verify it's a valid image
img = Image.open(output)
assert img.size == (64, 64)
@pytest.mark.asyncio
async def test_render_image_auto_windowing(
self, dicom_mcp_module, synthetic_dicom, tmp_path
):
"""Test dicom_render_image auto windowing uses percentiles."""
file_path, _ = synthetic_dicom
output = tmp_path / "auto.png"
result = await dicom_mcp_module.dicom_render_image(
file_path=str(file_path),
output_path=str(output),
auto_window=True,
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert output.exists()
assert data["windowing"]["source"] == "auto (p5-p95)"
@pytest.mark.asyncio
async def test_render_image_manual_windowing(
self, dicom_mcp_module, synthetic_dicom, tmp_path
):
"""Test dicom_render_image manual windowing overrides header."""
file_path, _ = synthetic_dicom
output = tmp_path / "manual.png"
result = await dicom_mcp_module.dicom_render_image(
file_path=str(file_path),
output_path=str(output),
window_center=2000.0,
window_width=4000.0,
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["windowing"]["source"] == "manual"
assert data["windowing"]["center"] == 2000.0
assert data["windowing"]["width"] == 4000.0
@pytest.mark.asyncio
async def test_render_image_with_roi_overlays(
self, dicom_mcp_module, synthetic_dicom, tmp_path
):
"""Test dicom_render_image draws ROI overlays."""
file_path, _ = synthetic_dicom
output = tmp_path / "overlays.png"
result = await dicom_mcp_module.dicom_render_image(
file_path=str(file_path),
output_path=str(output),
auto_window=True,
overlay_rois=[
{"roi": [24, 24, 16, 16], "label": "Signal", "color": "green"},
{"roi": [0, 0, 16, 16], "label": "Noise", "color": "red"},
],
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["overlays"] == 2
img = Image.open(output)
# With overlays, image should be RGB
assert img.mode == "RGB"
@pytest.mark.asyncio
async def test_render_image_no_info_bar(
self, dicom_mcp_module, synthetic_dicom, tmp_path
):
"""Test dicom_render_image without info bar stays greyscale."""
file_path, _ = synthetic_dicom
output = tmp_path / "no_info.png"
await dicom_mcp_module.dicom_render_image(
file_path=str(file_path),
output_path=str(output),
show_info=False,
auto_window=True,
)
img = Image.open(output)
assert img.mode == "L" # greyscale, no overlays or info text
@pytest.mark.asyncio
async def test_render_image_file_not_found(self, dicom_mcp_module, tmp_path):
"""Test dicom_render_image returns error for missing file."""
result = await dicom_mcp_module.dicom_render_image(
file_path=str(tmp_path / "nonexistent.dcm"),
output_path=str(tmp_path / "out.png"),
)
assert "Error" in result
assert "not found" in result
@pytest.mark.asyncio
async def test_render_image_creates_parent_dirs(
self, dicom_mcp_module, synthetic_dicom, tmp_path
):
"""Test dicom_render_image creates output directories if needed."""
file_path, _ = synthetic_dicom
output = tmp_path / "sub" / "dir" / "image.png"
await dicom_mcp_module.dicom_render_image(
file_path=str(file_path),
output_path=str(output),
auto_window=True,
show_info=False,
)
assert output.exists()
class TestPhilipsPrivateTagHelpers:
"""Test Philips private tag resolution helpers with synthetic datasets."""
def _make_philips_dataset(self):
"""Create a synthetic Philips DICOM dataset with private creator tags."""
ds = _make_base_dataset(
bits_stored=16,
Manufacturer="Philips Medical Systems",
ManufacturerModelName="Ingenia",
SeriesDescription="Test Series",
)
# Simulate Philips Private Creator tags in group 0x2005
# Slot 0x10 -> DD 001
ds.add_new((0x2005, 0x0010), "LO", "Philips MR Imaging DD 001")
# Slot 0x11 -> DD 002
ds.add_new((0x2005, 0x0011), "LO", "Philips MR Imaging DD 002")
# Slot 0x12 -> DD 004
ds.add_new((0x2005, 0x0012), "LO", "Philips MR Imaging DD 004")
# Add some private data elements in each block
# DD 001, block 0x10: element at offset 0x85 -> tag (2005, 1085)
ds.add_new((0x2005, 0x1085), "LO", "ShimValue_42")
# DD 001, block 0x10: element at offset 0x63 -> tag (2005, 1063)
ds.add_new((0x2005, 0x1063), "LO", "StackID_1")
# DD 002, block 0x11: element at offset 0x30 -> tag (2005, 1130)
ds.add_new((0x2005, 0x1130), "LO", "DD002_Value")
return ds
def test_resolve_known_tag(self, dicom_mcp_module):
"""Test resolving DD 001 offset 0x85 -> (2005,1085)."""
ds = self._make_philips_dataset()
resolved_tag, creator_str, value_str = (
dicom_mcp_module._resolve_philips_private_tag(
ds, dd_number=1, element_offset=0x85
)
)
assert resolved_tag == (0x2005, 0x1085)
assert "DD 001" in creator_str
assert value_str == "ShimValue_42"
def test_resolve_different_dd(self, dicom_mcp_module):
"""Test resolving DD 002 offset 0x30 -> (2005,1130)."""
ds = self._make_philips_dataset()
resolved_tag, creator_str, value_str = (
dicom_mcp_module._resolve_philips_private_tag(
ds, dd_number=2, element_offset=0x30
)
)
assert resolved_tag == (0x2005, 0x1130)
assert "DD 002" in creator_str
assert value_str == "DD002_Value"
def test_resolve_missing_dd(self, dicom_mcp_module):
"""Test resolving a DD number that doesn't exist returns None."""
ds = self._make_philips_dataset()
resolved_tag, creator_str, value_str = (
dicom_mcp_module._resolve_philips_private_tag(
ds, dd_number=99, element_offset=0x85
)
)
assert resolved_tag is None
assert creator_str is None
assert value_str is None
def test_resolve_tag_no_value(self, dicom_mcp_module):
"""Test resolving a valid DD but offset with no data returns None value."""
ds = self._make_philips_dataset()
# DD 004 exists at slot 0x12, but no element at offset 0xFF
resolved_tag, creator_str, value_str = (
dicom_mcp_module._resolve_philips_private_tag(
ds, dd_number=4, element_offset=0xFF
)
)
assert resolved_tag == (0x2005, 0x12FF)
assert "DD 004" in creator_str
assert value_str is None
def test_resolve_second_offset_same_dd(self, dicom_mcp_module):
"""Test resolving DD 001 offset 0x63 -> (2005,1063)."""
ds = self._make_philips_dataset()
resolved_tag, creator_str, value_str = (
dicom_mcp_module._resolve_philips_private_tag(
ds, dd_number=1, element_offset=0x63
)
)
assert resolved_tag == (0x2005, 0x1063)
assert value_str == "StackID_1"
def test_list_creators(self, dicom_mcp_module):
"""Test listing all Private Creator tags."""
ds = self._make_philips_dataset()
creators = dicom_mcp_module._list_philips_private_creators(ds)
assert len(creators) == 3
dd_numbers = [c["dd_number"] for c in creators]
assert 1 in dd_numbers
assert 2 in dd_numbers
assert 4 in dd_numbers
def test_list_creators_empty(self, dicom_mcp_module):
"""Test listing creators on a dataset with no private tags."""
ds = pydicom.Dataset()
creators = dicom_mcp_module._list_philips_private_creators(ds)
assert creators == []
def test_list_creators_slots(self, dicom_mcp_module):
"""Test that creator slot numbers are correct."""
ds = self._make_philips_dataset()
creators = dicom_mcp_module._list_philips_private_creators(ds)
slots = {c["dd_number"]: c["slot"] for c in creators}
assert slots[1] == 0x10
assert slots[2] == 0x11
assert slots[4] == 0x12
def test_resolve_custom_group(self, dicom_mcp_module):
"""Test resolving private tags in a non-default group."""
ds = pydicom.Dataset()
# Use group 0x2001 instead of 0x2005
ds.add_new((0x2001, 0x0010), "LO", "Philips MR Imaging DD 005")
ds.add_new((0x2001, 0x1042), "LO", "CustomGroupValue")
resolved_tag, creator_str, value_str = (
dicom_mcp_module._resolve_philips_private_tag(
ds, dd_number=5, element_offset=0x42, private_group=0x2001
)
)
assert resolved_tag == (0x2001, 0x1042)
assert value_str == "CustomGroupValue"
class TestPhilipsPrivateToolWithSyntheticDicom:
"""Functional tests for dicom_query_philips_private using synthetic DICOM files."""
@pytest.fixture
def philips_dicom(self, tmp_path):
"""Create a synthetic Philips DICOM file with private creator tags."""
ds = _make_base_dataset(
bits_stored=16,
Modality="MR",
Manufacturer="Philips Medical Systems",
ManufacturerModelName="Ingenia",
SeriesDescription="Philips Test Series",
SeriesNumber=1,
InstanceNumber=1,
FrameOfReferenceUID=generate_uid(),
)
# Private Creator tags
ds.add_new((0x2005, 0x0010), "LO", "Philips MR Imaging DD 001")
ds.add_new((0x2005, 0x0011), "LO", "Philips MR Imaging DD 002")
# Private data elements
ds.add_new((0x2005, 0x1085), "LO", "ShimCalcValue")
ds.add_new((0x2005, 0x1130), "LO", "DD002_TestVal")
file_path = tmp_path / "philips_test.dcm"
_save_dicom(ds, file_path)
return file_path
@pytest.mark.asyncio
async def test_list_creators_tool(self, dicom_mcp_module, philips_dicom):
"""Test dicom_query_philips_private with list_creators=True."""
result = await dicom_mcp_module.dicom_query_philips_private(
file_path=str(philips_dicom),
list_creators=True,
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert len(data["creators"]) == 2
dd_nums = [c["dd_number"] for c in data["creators"]]
assert 1 in dd_nums
assert 2 in dd_nums
@pytest.mark.asyncio
async def test_resolve_tag_tool(self, dicom_mcp_module, philips_dicom):
"""Test dicom_query_philips_private resolving DD 001 offset 0x85."""
result = await dicom_mcp_module.dicom_query_philips_private(
file_path=str(philips_dicom),
dd_number=1,
element_offset=0x85,
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["resolution"]["resolved_tag"] == "(2005,1085)"
assert data["resolution"]["value"] == "ShimCalcValue"
assert "DD 001" in data["resolution"]["creator_string"]
@pytest.mark.asyncio
async def test_resolve_missing_dd_tool(self, dicom_mcp_module, philips_dicom):
"""Test dicom_query_philips_private with non-existent DD number."""
result = await dicom_mcp_module.dicom_query_philips_private(
file_path=str(philips_dicom),
dd_number=99,
element_offset=0x85,
)
assert "Error" in result
assert "DD 099" in result
@pytest.mark.asyncio
async def test_resolve_tag_markdown(self, dicom_mcp_module, philips_dicom):
"""Test dicom_query_philips_private markdown output."""
result = await dicom_mcp_module.dicom_query_philips_private(
file_path=str(philips_dicom),
dd_number=1,
element_offset=0x85,
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "Philips Private Tag Lookup" in result
assert "(2005,1085)" in result
assert "ShimCalcValue" in result
@pytest.mark.asyncio
async def test_list_creators_markdown(self, dicom_mcp_module, philips_dicom):
"""Test dicom_query_philips_private list_creators markdown output."""
result = await dicom_mcp_module.dicom_query_philips_private(
file_path=str(philips_dicom),
list_creators=True,
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "Philips Private Creators" in result
assert "DD 001" in result
assert "DD 002" in result
@pytest.mark.asyncio
async def test_no_args_error(self, dicom_mcp_module, philips_dicom):
"""Test dicom_query_philips_private with no mode-specific args."""
result = await dicom_mcp_module.dicom_query_philips_private(
file_path=str(philips_dicom),
)
assert "Error" in result
assert "list_creators" in result
@pytest.mark.asyncio
async def test_file_not_found(self, dicom_mcp_module, tmp_path):
"""Test dicom_query_philips_private with missing file."""
result = await dicom_mcp_module.dicom_query_philips_private(
file_path=str(tmp_path / "nonexistent.dcm"),
list_creators=True,
)
assert "Error" in result
assert "not found" in result
class TestGetMetadataPhilipsPrivate:
"""Test dicom_get_metadata with philips_private_tags parameter."""
@pytest.fixture
def philips_dicom(self, tmp_path):
"""Create a synthetic Philips DICOM file with private creator tags."""
ds = _make_base_dataset(
bits_stored=16,
Modality="MR",
Manufacturer="Philips Medical Systems",
SeriesDescription="Test",
SeriesNumber=1,
PatientName="TestPatient",
PatientID="12345",
)
# Private Creator tags
ds.add_new((0x2005, 0x0010), "LO", "Philips MR Imaging DD 001")
ds.add_new((0x2005, 0x1085), "LO", "ShimTestValue")
file_path = tmp_path / "philips_meta_test.dcm"
_save_dicom(ds, file_path)
return file_path
@pytest.mark.asyncio
async def test_metadata_with_philips_private(self, dicom_mcp_module, philips_dicom):
"""Test dicom_get_metadata with philips_private_tags resolves tags."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(philips_dicom),
tag_groups=["manufacturer"],
philips_private_tags=[{"dd_number": 1, "element_offset": 0x85}],
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert "philips_private" in data["metadata"]
private = data["metadata"]["philips_private"]
# Key is DD001_0x85
assert "DD001_0x85" in private
assert private["DD001_0x85"]["value"] == "ShimTestValue"
assert private["DD001_0x85"]["resolved_tag"] == "(2005,1085)"
@pytest.mark.asyncio
async def test_metadata_philips_private_missing_dd(
self, dicom_mcp_module, philips_dicom
):
"""Test dicom_get_metadata with non-existent DD number."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(philips_dicom),
tag_groups=["manufacturer"],
philips_private_tags=[{"dd_number": 99, "element_offset": 0x01}],
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert "philips_private" in data["metadata"]
private = data["metadata"]["philips_private"]
assert "DD099_0x01" in private
assert "error" in private["DD099_0x01"]
@pytest.mark.asyncio
async def test_metadata_philips_private_markdown(
self, dicom_mcp_module, philips_dicom
):
"""Test dicom_get_metadata philips_private_tags in markdown format."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(philips_dicom),
tag_groups=["manufacturer"],
philips_private_tags=[{"dd_number": 1, "element_offset": 0x85}],
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "Philips Private" in result
assert "(2005,1085)" in result
assert "ShimTestValue" in result
class TestPIIFiltering:
"""Test PII filtering when DICOM_MCP_PII_FILTER is enabled."""
@pytest.fixture
def pii_dicom(self, tmp_path):
"""Create a synthetic DICOM with known patient data."""
ds = _make_pii_dataset()
ds.StudyDate = "20240101"
file_path = tmp_path / "pii_test.dcm"
_save_dicom(ds, file_path)
return file_path
@pytest.fixture(autouse=True)
def enable_pii_filter(self, monkeypatch):
"""Enable the PII filter for every test in this class."""
import importlib
import dicom_mcp.config
monkeypatch.setenv("DICOM_MCP_PII_FILTER", "true")
importlib.reload(dicom_mcp.config)
yield
monkeypatch.delenv("DICOM_MCP_PII_FILTER", raising=False)
importlib.reload(dicom_mcp.config)
@pytest.mark.asyncio
async def test_get_metadata_redacts_patient_info(self, dicom_mcp_module, pii_dicom):
"""dicom_get_metadata should redact patient tags when filter is on."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(pii_dicom),
tag_groups=["patient_info"],
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
patient = data["metadata"]["patient_info"]
assert patient["PatientName"] == "[REDACTED]"
assert patient["PatientID"] == "[REDACTED]"
assert patient["PatientBirthDate"] == "[REDACTED]"
assert patient["PatientSex"] == "[REDACTED]"
@pytest.mark.asyncio
async def test_get_metadata_does_not_redact_non_pii(
self, dicom_mcp_module, pii_dicom
):
"""Non-patient tags should NOT be redacted even when filter is on."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(pii_dicom),
tag_groups=["manufacturer"],
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
manufacturer = data["metadata"]["manufacturer"]
assert manufacturer["Manufacturer"] == "SIEMENS"
assert "[REDACTED]" not in json.dumps(manufacturer)
@pytest.mark.asyncio
async def test_get_metadata_redacts_in_markdown(self, dicom_mcp_module, pii_dicom):
"""Markdown output should also show [REDACTED] for patient tags."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(pii_dicom),
tag_groups=["patient_info"],
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "[REDACTED]" in result
assert "Doe^Jane" not in result
assert "PII-TEST-001" not in result
class TestPIIFilteringDisabled:
"""Test that PII is NOT filtered when DICOM_MCP_PII_FILTER is off."""
@pytest.fixture
def pii_dicom(self, tmp_path):
"""Create a synthetic DICOM with known patient data."""
ds = _make_pii_dataset()
file_path = tmp_path / "pii_disabled_test.dcm"
_save_dicom(ds, file_path)
return file_path
@pytest.fixture(autouse=True)
def disable_pii_filter(self, monkeypatch):
"""Ensure the PII filter is OFF for every test in this class."""
import importlib
import dicom_mcp.config
monkeypatch.delenv("DICOM_MCP_PII_FILTER", raising=False)
importlib.reload(dicom_mcp.config)
yield
importlib.reload(dicom_mcp.config)
@pytest.mark.asyncio
async def test_get_metadata_shows_patient_data(self, dicom_mcp_module, pii_dicom):
"""Patient data should be visible when filter is off."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(pii_dicom),
tag_groups=["patient_info"],
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
patient = data["metadata"]["patient_info"]
assert patient["PatientName"] == "Doe^Jane"
assert patient["PatientID"] == "PII-TEST-001"
assert patient["PatientBirthDate"] == "19850315"
assert patient["PatientSex"] == "F"
@pytest.mark.asyncio
async def test_no_redaction_markers_present(self, dicom_mcp_module, pii_dicom):
"""No [REDACTED] markers should appear when filter is off."""
result = await dicom_mcp_module.dicom_get_metadata(
file_path=str(pii_dicom),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
assert "[REDACTED]" not in result
class TestDicomDumpTree:
"""Tests for dicom_dump_tree tool."""
@pytest.fixture
def tree_dicom(self, tmp_path):
"""Create a synthetic DICOM with nested sequences."""
ds = _make_base_dataset(
Modality="MR",
PatientName="TreeTest^Patient",
PatientID="TREE-001",
SeriesDescription="Tree Test Series",
)
# Add a nested sequence using standard DICOM tags
code_item = pydicom.Dataset()
code_item.CodeValue = "12345"
code_item.CodingSchemeDesignator = "DCM"
code_item.CodeMeaning = "Test Code"
ref_item = pydicom.Dataset()
ref_item.ReferencedSOPClassUID = MRImageStorage
ref_item.ReferencedSOPInstanceUID = generate_uid()
ref_item.PurposeOfReferenceCodeSequence = DicomSequence([code_item])
ds.ReferencedStudySequence = DicomSequence([ref_item])
# Add a private tag
ds.add_new((0x0009, 0x0010), "LO", "TEST PRIVATE")
ds.add_new((0x0009, 0x1001), "LO", "PrivateValue42")
file_path = tmp_path / "tree_test.dcm"
_save_dicom(ds, file_path)
return file_path
@pytest.mark.asyncio
async def test_tree_markdown_output(self, dicom_mcp_module, tree_dicom):
"""Test markdown tree output contains expected elements."""
result = await dicom_mcp_module.dicom_dump_tree(
file_path=str(tree_dicom),
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "# DICOM Tree:" in result
assert "PatientName" in result
assert "TreeTest^Patient" in result
assert "SeriesDescription" in result
@pytest.mark.asyncio
async def test_tree_json_output(self, dicom_mcp_module, tree_dicom):
"""Test JSON tree output has correct structure."""
result = await dicom_mcp_module.dicom_dump_tree(
file_path=str(tree_dicom),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert "tree" in data
assert isinstance(data["tree"], list)
assert data["file"] == "tree_test.dcm"
assert len(data["tree"]) > 0
@pytest.mark.asyncio
async def test_tree_includes_sequences(self, dicom_mcp_module, tree_dicom):
"""Test that nested sequences appear in tree output."""
result = await dicom_mcp_module.dicom_dump_tree(
file_path=str(tree_dicom),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
# Find the sequence element
seq_nodes = [n for n in data["tree"] if n["vr"] == "SQ"]
assert len(seq_nodes) > 0
# Check it has items with children
assert "items" in seq_nodes[0]
assert len(seq_nodes[0]["items"]) > 0
@pytest.mark.asyncio
async def test_tree_max_depth(self, dicom_mcp_module, tree_dicom):
"""Test max_depth=0 prevents sequence recursion."""
result = await dicom_mcp_module.dicom_dump_tree(
file_path=str(tree_dicom),
max_depth=0,
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
# Sequences should exist but have no items expanded
seq_nodes = [n for n in data["tree"] if n["vr"] == "SQ"]
for node in seq_nodes:
assert "items" not in node
@pytest.mark.asyncio
async def test_tree_hide_private(self, dicom_mcp_module, tree_dicom):
"""Test show_private=False excludes private tags."""
result = await dicom_mcp_module.dicom_dump_tree(
file_path=str(tree_dicom),
show_private=False,
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "PrivateValue42" not in result
# Standard tags should still be present
assert "PatientName" in result
@pytest.mark.asyncio
async def test_tree_show_private(self, dicom_mcp_module, tree_dicom):
"""Test show_private=True includes private tags."""
result = await dicom_mcp_module.dicom_dump_tree(
file_path=str(tree_dicom),
show_private=True,
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "PrivateValue42" in result
@pytest.mark.asyncio
async def test_tree_file_not_found(self, dicom_mcp_module):
"""Test error for non-existent file."""
result = await dicom_mcp_module.dicom_dump_tree(
file_path="/nonexistent/file.dcm",
)
assert "Error" in result
class TestDicomCompareUids:
"""Tests for dicom_compare_uids tool."""
def _make_dicom_file(self, tmp_path, subdir, filename, series_uid, study_uid=None):
"""Create a minimal DICOM file with specified UIDs."""
ds = _make_base_dataset(
series_instance_uid=series_uid,
study_instance_uid=study_uid,
Modality="MR",
FrameOfReferenceUID=generate_uid(),
)
file_path = tmp_path / subdir / filename
_save_dicom(ds, file_path)
return file_path
@pytest.mark.asyncio
async def test_identical_uids(self, dicom_mcp_module, tmp_path):
"""Two directories with same SeriesInstanceUID should match."""
uid = "1.2.3.4.5.6.7.8.9"
self._make_dicom_file(tmp_path, "dir1", "a.dcm", uid)
self._make_dicom_file(tmp_path, "dir2", "b.dcm", uid)
result = await dicom_mcp_module.dicom_compare_uids(
directory1=str(tmp_path / "dir1"),
directory2=str(tmp_path / "dir2"),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["match"] is True
assert data["shared"] == 1
assert data["only_in_directory1"] == 0
assert data["only_in_directory2"] == 0
@pytest.mark.asyncio
async def test_disjoint_uids(self, dicom_mcp_module, tmp_path):
"""Two directories with different UIDs should report mismatches."""
self._make_dicom_file(tmp_path, "dir1", "a.dcm", "1.2.3.4.5")
self._make_dicom_file(tmp_path, "dir2", "b.dcm", "9.8.7.6.5")
result = await dicom_mcp_module.dicom_compare_uids(
directory1=str(tmp_path / "dir1"),
directory2=str(tmp_path / "dir2"),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["match"] is False
assert data["shared"] == 0
assert data["only_in_directory1"] == 1
assert data["only_in_directory2"] == 1
@pytest.mark.asyncio
async def test_partial_overlap(self, dicom_mcp_module, tmp_path):
"""Test partial overlap between directories."""
shared_uid = "1.2.3.4.5"
self._make_dicom_file(tmp_path, "dir1", "a.dcm", shared_uid)
self._make_dicom_file(tmp_path, "dir1", "b.dcm", "1.2.3.4.6")
self._make_dicom_file(tmp_path, "dir2", "c.dcm", shared_uid)
self._make_dicom_file(tmp_path, "dir2", "d.dcm", "9.8.7.6.5")
result = await dicom_mcp_module.dicom_compare_uids(
directory1=str(tmp_path / "dir1"),
directory2=str(tmp_path / "dir2"),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["match"] is False
assert data["shared"] == 1
assert data["only_in_directory1"] == 1
assert data["only_in_directory2"] == 1
@pytest.mark.asyncio
async def test_custom_compare_tag(self, dicom_mcp_module, tmp_path):
"""Test comparing by StudyInstanceUID instead of SeriesInstanceUID."""
study_uid = "1.2.3.100"
self._make_dicom_file(tmp_path, "dir1", "a.dcm", "1.2.3.101", study_uid)
self._make_dicom_file(tmp_path, "dir2", "b.dcm", "1.2.3.102", study_uid)
result = await dicom_mcp_module.dicom_compare_uids(
directory1=str(tmp_path / "dir1"),
directory2=str(tmp_path / "dir2"),
compare_tag="StudyInstanceUID",
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["match"] is True
assert data["shared"] == 1
@pytest.mark.asyncio
async def test_markdown_output(self, dicom_mcp_module, tmp_path):
"""Test markdown output format."""
self._make_dicom_file(tmp_path, "dir1", "a.dcm", "1.2.3")
self._make_dicom_file(tmp_path, "dir2", "b.dcm", "1.2.3")
result = await dicom_mcp_module.dicom_compare_uids(
directory1=str(tmp_path / "dir1"),
directory2=str(tmp_path / "dir2"),
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "# UID Comparison" in result
assert "MATCH" in result
@pytest.mark.asyncio
async def test_directory_not_found(self, dicom_mcp_module, tmp_path):
"""Test error for non-existent directory."""
result = await dicom_mcp_module.dicom_compare_uids(
directory1="/nonexistent/dir1",
directory2=str(tmp_path),
)
assert "Error" in result
@pytest.mark.asyncio
async def test_empty_directory(self, dicom_mcp_module, tmp_path):
"""Test with an empty directory."""
(tmp_path / "dir1").mkdir()
(tmp_path / "dir2").mkdir()
result = await dicom_mcp_module.dicom_compare_uids(
directory1=str(tmp_path / "dir1"),
directory2=str(tmp_path / "dir2"),
)
assert "No DICOM files" in result
class TestDicomVerifySegmentations:
"""Tests for dicom_verify_segmentations tool."""
def _make_source_dicom(self, dir_path, filename, sop_uid):
"""Create a minimal source DICOM with a known SOPInstanceUID."""
ds = _make_base_dataset(
sop_instance_uid=sop_uid,
Modality="MR",
)
file_path = dir_path / filename
_save_dicom(ds, file_path)
return file_path
def _make_seg_dicom(self, dir_path, filename, ref_sop_uids):
"""Create a minimal segmentation DICOM referencing source UIDs."""
ds = _make_base_dataset(
sop_class=SegmentationStorage,
SeriesDescription="Segmentation",
Modality="SEG",
)
# Build SourceImageSequence
source_items = []
for ref_uid in ref_sop_uids:
item = pydicom.Dataset()
item.ReferencedSOPClassUID = MRImageStorage
item.ReferencedSOPInstanceUID = UID(ref_uid)
source_items.append(item)
ds.SourceImageSequence = DicomSequence(source_items)
file_path = dir_path / filename
_save_dicom(ds, file_path)
return file_path
@pytest.mark.asyncio
async def test_all_matched(self, dicom_mcp_module, tmp_path):
"""Test with segmentations that all reference valid source files."""
source_uid = "1.2.3.4.5.100"
d = tmp_path / "seg_test"
self._make_source_dicom(d, "source.dcm", source_uid)
self._make_seg_dicom(d, "seg.dcm", [source_uid])
result = await dicom_mcp_module.dicom_verify_segmentations(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["all_matched"] is True
assert data["matched_references"] == 1
assert data["unmatched_references"] == 0
@pytest.mark.asyncio
async def test_unmatched_reference(self, dicom_mcp_module, tmp_path):
"""Test with a dangling reference in the segmentation."""
d = tmp_path / "seg_test"
self._make_source_dicom(d, "source.dcm", "1.2.3.200")
self._make_seg_dicom(d, "seg.dcm", ["1.2.3.999"])
result = await dicom_mcp_module.dicom_verify_segmentations(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["all_matched"] is False
assert data["unmatched_references"] == 1
@pytest.mark.asyncio
async def test_multiple_references(self, dicom_mcp_module, tmp_path):
"""Test segmentation with multiple source references."""
uid1 = "1.2.3.4.5.201"
uid2 = "1.2.3.4.5.202"
d = tmp_path / "seg_test"
self._make_source_dicom(d, "source1.dcm", uid1)
self._make_source_dicom(d, "source2.dcm", uid2)
self._make_seg_dicom(d, "seg.dcm", [uid1, uid2])
result = await dicom_mcp_module.dicom_verify_segmentations(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["all_matched"] is True
assert data["matched_references"] == 2
@pytest.mark.asyncio
async def test_no_segmentations(self, dicom_mcp_module, tmp_path):
"""Test with no segmentation files in directory."""
d = tmp_path / "seg_test"
self._make_source_dicom(d, "source.dcm", "1.2.3.4.5")
result = await dicom_mcp_module.dicom_verify_segmentations(
directory=str(d),
)
assert "No segmentation files" in result
@pytest.mark.asyncio
async def test_markdown_output(self, dicom_mcp_module, tmp_path):
"""Test markdown output format."""
d = tmp_path / "seg_test"
self._make_source_dicom(d, "source.dcm", "1.2.3.400")
self._make_seg_dicom(d, "seg.dcm", ["1.2.3.400"])
result = await dicom_mcp_module.dicom_verify_segmentations(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "# Segmentation Verification" in result
assert "ALL MATCHED" in result
@pytest.mark.asyncio
async def test_markdown_unmatched_details(self, dicom_mcp_module, tmp_path):
"""Test markdown output shows unmatched reference details."""
d = tmp_path / "seg_test"
self._make_source_dicom(d, "source.dcm", "1.2.3.200")
self._make_seg_dicom(d, "seg.dcm", ["1.2.3.999"])
result = await dicom_mcp_module.dicom_verify_segmentations(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "UNMATCHED REFERENCES FOUND" in result
assert "NOT FOUND" in result
@pytest.mark.asyncio
async def test_directory_not_found(self, dicom_mcp_module):
"""Test error for non-existent directory."""
result = await dicom_mcp_module.dicom_verify_segmentations(
directory="/nonexistent/dir",
)
assert "Error" in result
class TestDicomAnalyzeTi:
"""Tests for dicom_analyze_ti tool."""
def _make_molli_file(
self,
dir_path,
filename,
instance_num,
inversion_time=None,
series_num=601,
series_desc="LMS MOLLI",
manufacturer="SIEMENS",
philips_private_ti=None,
):
"""Create a synthetic MOLLI DICOM file."""
ds = _make_base_dataset(
Modality="MR",
Manufacturer=manufacturer,
SeriesNumber=series_num,
SeriesDescription=series_desc,
InstanceNumber=instance_num,
ScanningSequence="GR",
SequenceVariant="MP",
MRAcquisitionType="2D",
RepetitionTime=2.42,
EchoTime=1.05,
FlipAngle=35,
)
if inversion_time is not None:
ds.InversionTime = inversion_time
# Add Philips private TI tag if requested
if philips_private_ti is not None and "philips" in manufacturer.lower():
# Add DD 006 private creator at group 2005
ds.add_new((0x2005, 0x0015), "LO", "Philips MR Imaging DD 006")
# Add the TI value at offset 0x72 from block 0x15
ds.add_new((0x2005, 0x1572), "FL", philips_private_ti)
file_path = dir_path / filename
_save_dicom(ds, file_path)
return file_path
@pytest.mark.asyncio
async def test_siemens_standard_ti(self, dicom_mcp_module, tmp_path):
"""Test TI extraction from standard InversionTime tag (Siemens/GE)."""
d = tmp_path / "molli"
tis = [100.0, 200.0, 350.0, 500.0, 750.0]
for i, ti in enumerate(tis, 1):
self._make_molli_file(d, f"img{i:02d}.dcm", i, inversion_time=ti)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["t1_mapping_files"] == 5
assert data["series_count"] == 1
stats = data["series"][0]["statistics"]
assert stats["non_zero_ti_count"] == 5
assert stats["ti_min"] == 100.0
assert stats["ti_max"] == 750.0
@pytest.mark.asyncio
async def test_philips_private_ti(self, dicom_mcp_module, tmp_path):
"""Test TI extraction from Philips private tag."""
d = tmp_path / "molli_philips"
tis = [78.0, 159.0, 856.0, 1034.0, 1656.0]
for i, ti in enumerate(tis, 1):
self._make_molli_file(
d,
f"img{i:02d}.dcm",
i,
manufacturer="Philips Medical Systems",
philips_private_ti=ti,
)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["t1_mapping_files"] == 5
stats = data["series"][0]["statistics"]
assert stats["non_zero_ti_count"] == 5
assert stats["ti_min"] == 78.0
assert stats["ti_max"] == 1656.0
@pytest.mark.asyncio
async def test_zero_ti_filtering(self, dicom_mcp_module, tmp_path):
"""Test that zero TIs are counted separately as output maps."""
d = tmp_path / "molli"
self._make_molli_file(d, "img01.dcm", 1, inversion_time=100.0)
self._make_molli_file(d, "img02.dcm", 2, inversion_time=200.0)
self._make_molli_file(d, "img03.dcm", 3, inversion_time=0.0)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
stats = data["series"][0]["statistics"]
assert stats["non_zero_ti_count"] == 2
assert stats["zero_ti_count"] == 1
@pytest.mark.asyncio
async def test_gap_threshold_exceeded(self, dicom_mcp_module, tmp_path):
"""Test gap threshold warning is triggered."""
d = tmp_path / "molli"
tis = [100.0, 200.0, 3500.0] # gap of 3300 exceeds default 2500
for i, ti in enumerate(tis, 1):
self._make_molli_file(d, f"img{i:02d}.dcm", i, inversion_time=ti)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
stats = data["series"][0]["statistics"]
assert stats["gap_threshold_exceeded"] is True
assert stats["max_gap"] == 3300.0
@pytest.mark.asyncio
async def test_gap_threshold_not_exceeded(self, dicom_mcp_module, tmp_path):
"""Test gap threshold warning is not triggered for small gaps."""
d = tmp_path / "molli"
tis = [100.0, 200.0, 350.0]
for i, ti in enumerate(tis, 1):
self._make_molli_file(d, f"img{i:02d}.dcm", i, inversion_time=ti)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
stats = data["series"][0]["statistics"]
assert stats["gap_threshold_exceeded"] is False
@pytest.mark.asyncio
async def test_custom_gap_threshold(self, dicom_mcp_module, tmp_path):
"""Test custom gap_threshold parameter."""
d = tmp_path / "molli"
tis = [100.0, 200.0, 500.0] # gap of 300
for i, ti in enumerate(tis, 1):
self._make_molli_file(d, f"img{i:02d}.dcm", i, inversion_time=ti)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
gap_threshold=200.0,
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
stats = data["series"][0]["statistics"]
assert stats["gap_threshold_exceeded"] is True
@pytest.mark.asyncio
async def test_no_t1_mapping_files(self, dicom_mcp_module, tmp_path):
"""Test with no T1 mapping sequences in directory."""
d = tmp_path / "not_molli"
ds = _make_base_dataset(
Modality="MR",
Manufacturer="SIEMENS",
SeriesDescription="LMS IDEAL",
ScanningSequence="GR",
SequenceVariant="SP",
)
_save_dicom(ds, d / "ideal.dcm")
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
)
assert "No T1 mapping" in result
@pytest.mark.asyncio
async def test_markdown_output(self, dicom_mcp_module, tmp_path):
"""Test markdown output format."""
d = tmp_path / "molli"
self._make_molli_file(d, "img01.dcm", 1, inversion_time=100.0)
self._make_molli_file(d, "img02.dcm", 2, inversion_time=200.0)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.MARKDOWN,
)
assert "# Inversion Time Analysis" in result
assert "LMS MOLLI" in result
assert "100.0" in result
@pytest.mark.asyncio
async def test_multiple_series(self, dicom_mcp_module, tmp_path):
"""Test with multiple MOLLI series in one directory."""
d = tmp_path / "molli"
self._make_molli_file(
d,
"s601_01.dcm",
1,
inversion_time=100.0,
series_num=601,
series_desc="LMS MOLLI",
)
self._make_molli_file(
d,
"s801_01.dcm",
1,
inversion_time=100.0,
series_num=801,
series_desc="LMS MOLLI simulated",
)
result = await dicom_mcp_module.dicom_analyze_ti(
directory=str(d),
response_format=dicom_mcp_module.ResponseFormat.JSON,
)
data = json.loads(result)
assert data["series_count"] == 2
@pytest.mark.asyncio
async def test_directory_not_found(self, dicom_mcp_module):
"""Test error for non-existent directory."""
result = await dicom_mcp_module.dicom_analyze_ti(
directory="/nonexistent/dir",
)
assert "Error" in result
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])