548 lines
18 KiB
Python
548 lines
18 KiB
Python
"""Tests for mcp-image-gen server — all ComfyUI HTTP calls mocked via respx."""
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import httpx
|
|
import pytest
|
|
import respx
|
|
|
|
# Import the server module (sys.path set by conftest.py)
|
|
import server
|
|
from server import (
|
|
ComfyUIClient,
|
|
build_flux_workflow,
|
|
generate_image,
|
|
get_generation_status,
|
|
get_output_directory,
|
|
list_available_models,
|
|
)
|
|
|
|
COMFYUI_BASE = "http://test-comfyui:8188"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# build_flux_workflow — pure function, no mocking needed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_build_flux_workflow_structure():
|
|
"""Verify build_flux_workflow returns a dict with correct node types."""
|
|
wf = build_flux_workflow(
|
|
prompt="a red cat",
|
|
neg_prompt="ugly",
|
|
width=512,
|
|
height=768,
|
|
steps=8,
|
|
seed=42,
|
|
model="flux1-schnell.safetensors",
|
|
)
|
|
assert wf["6"]["class_type"] == "CLIPTextEncode"
|
|
assert wf["8"]["class_type"] == "VAEDecode"
|
|
assert wf["9"]["class_type"] == "SaveImage"
|
|
assert wf["13"]["class_type"] == "KSampler"
|
|
assert wf["27"]["class_type"] == "EmptySD3LatentImage"
|
|
assert wf["30"]["class_type"] == "CheckpointLoaderSimple"
|
|
assert wf["33"]["class_type"] == "CLIPTextEncode"
|
|
|
|
|
|
def test_build_flux_workflow_params_injected():
|
|
"""Verify all parameters are injected into correct nodes."""
|
|
wf = build_flux_workflow(
|
|
prompt="a blue whale",
|
|
neg_prompt="cartoonish",
|
|
width=512,
|
|
height=768,
|
|
steps=8,
|
|
seed=12345,
|
|
model="sdxl.safetensors",
|
|
)
|
|
assert wf["6"]["inputs"]["text"] == "a blue whale"
|
|
assert wf["33"]["inputs"]["text"] == "cartoonish"
|
|
assert wf["27"]["inputs"]["width"] == 512
|
|
assert wf["27"]["inputs"]["height"] == 768
|
|
assert wf["13"]["inputs"]["steps"] == 8
|
|
assert wf["13"]["inputs"]["seed"] == 12345
|
|
assert wf["30"]["inputs"]["ckpt_name"] == "sdxl.safetensors"
|
|
|
|
|
|
def test_negative_prompt_included():
|
|
"""Verify negative prompt appears in workflow node 33 when provided."""
|
|
wf = build_flux_workflow(
|
|
prompt="forest",
|
|
neg_prompt="blurry, dark",
|
|
width=1024,
|
|
height=1024,
|
|
steps=4,
|
|
seed=1,
|
|
model="flux1-schnell.safetensors",
|
|
)
|
|
assert wf["33"]["inputs"]["text"] == "blurry, dark"
|
|
|
|
|
|
def test_random_seed_generated():
|
|
"""seed=-1 generates a random seed each call."""
|
|
wf1 = build_flux_workflow("cat", "", 512, 512, 4, -1, "flux1-schnell.safetensors")
|
|
wf2 = build_flux_workflow("cat", "", 512, 512, 4, -1, "flux1-schnell.safetensors")
|
|
seed1 = wf1["_meta"]["actual_seed"]
|
|
seed2 = wf2["_meta"]["actual_seed"]
|
|
# Both are valid integers
|
|
assert isinstance(seed1, int)
|
|
assert 0 <= seed1 < 2**32
|
|
# With overwhelming probability they differ
|
|
# (1/2^32 chance of collision — negligible for a test)
|
|
# We just verify _meta is populated
|
|
assert "_meta" in wf1
|
|
assert "_meta" in wf2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_available_models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_list_available_models():
|
|
"""Mock /object_info, verify model list is returned."""
|
|
mock_response = {
|
|
"CheckpointLoaderSimple": {
|
|
"input": {
|
|
"required": {
|
|
"ckpt_name": [
|
|
["flux1-schnell.safetensors", "sdxl.safetensors"],
|
|
{},
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/object_info/CheckpointLoaderSimple").mock(
|
|
return_value=httpx.Response(200, json=mock_response)
|
|
)
|
|
|
|
result = await list_available_models()
|
|
assert "flux1-schnell.safetensors" in result
|
|
assert "sdxl.safetensors" in result
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_list_available_models_comfyui_offline():
|
|
"""When ComfyUI is unreachable, list_available_models returns error message."""
|
|
respx.get(f"{COMFYUI_BASE}/object_info/CheckpointLoaderSimple").mock(
|
|
side_effect=httpx.ConnectError("connection refused")
|
|
)
|
|
|
|
result = await list_available_models()
|
|
assert len(result) == 1
|
|
assert "not reachable" in result[0].lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_generation_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_get_generation_status_pending(queue_with_pending):
|
|
"""prompt_id in queue_pending → status is 'pending'."""
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_with_pending)
|
|
)
|
|
|
|
result = await get_generation_status("test-uuid-1234")
|
|
assert result["status"] == "pending"
|
|
assert result["prompt_id"] == "test-uuid-1234"
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_get_generation_status_running(queue_with_running):
|
|
"""prompt_id in queue_running → status is 'running'."""
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_with_running)
|
|
)
|
|
|
|
result = await get_generation_status("test-uuid-1234")
|
|
assert result["status"] == "running"
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_get_generation_status_complete(queue_empty, mock_history_response):
|
|
"""prompt_id not in queue + found in history → status is 'completed'."""
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-uuid-1234").mock(
|
|
return_value=httpx.Response(200, json=mock_history_response)
|
|
)
|
|
|
|
result = await get_generation_status("test-uuid-1234")
|
|
assert result["status"] == "completed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_output_directory
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_get_output_directory_default(monkeypatch):
|
|
"""No IMAGE_OUTPUT_DIR env var → returns expanded ~/Pictures/mcp-generated."""
|
|
monkeypatch.delenv("IMAGE_OUTPUT_DIR", raising=False)
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", "~/Pictures/mcp-generated")
|
|
|
|
result = get_output_directory()
|
|
assert result == str(Path("~/Pictures/mcp-generated").expanduser().resolve())
|
|
assert "~" not in result # expanded
|
|
|
|
|
|
def test_get_output_directory_custom(monkeypatch, tmp_path):
|
|
"""IMAGE_OUTPUT_DIR set → returns that path."""
|
|
custom = str(tmp_path / "custom-output")
|
|
monkeypatch.setenv("IMAGE_OUTPUT_DIR", custom)
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", custom)
|
|
|
|
result = get_output_directory()
|
|
assert result == str(Path(custom).expanduser().resolve())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_image
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_success(
|
|
tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch
|
|
):
|
|
"""Mock full lifecycle: queue → poll done → history → view. Verify outputs."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
# 1. POST /api/prompt → prompt_id
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-uuid-1234"})
|
|
)
|
|
# 2. GET /api/queue → empty (job done immediately)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
# 3. GET /api/history/test-uuid-1234
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-uuid-1234").mock(
|
|
return_value=httpx.Response(200, json=mock_history_response)
|
|
)
|
|
# 4. GET /api/view → image bytes
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a red cat",
|
|
output_dir=str(tmp_path),
|
|
)
|
|
|
|
# Should return [TextContent, ImageContent]
|
|
assert len(result) == 2
|
|
text_content = result[0]
|
|
image_content = result[1]
|
|
|
|
# TextContent has path info
|
|
assert "Generated:" in text_content.text
|
|
assert str(tmp_path) in text_content.text
|
|
|
|
# ImageContent has valid base64 PNG
|
|
assert image_content.type == "image"
|
|
assert image_content.mimeType == "image/png"
|
|
decoded = base64.b64decode(image_content.data)
|
|
assert decoded[:8] == b"\x89PNG\r\n\x1a\n" # PNG magic bytes
|
|
|
|
# File was actually saved
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 1
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_comfyui_unavailable():
|
|
"""ComfyUI unreachable → returns graceful error message as single TextContent."""
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
side_effect=httpx.ConnectError("connection refused")
|
|
)
|
|
|
|
result = await generate_image(prompt="a cat")
|
|
|
|
assert len(result) == 1
|
|
assert "not reachable" in result[0].text.lower()
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_timeout(monkeypatch, queue_with_pending):
|
|
"""Poll loop never completes within timeout → returns timeout error."""
|
|
monkeypatch.setattr(server, "COMFYUI_TIMEOUT", 0) # instant timeout
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-uuid-1234"})
|
|
)
|
|
# Queue always shows job pending → never finishes
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_with_pending)
|
|
)
|
|
|
|
result = await generate_image(prompt="slow image")
|
|
|
|
assert len(result) == 1
|
|
assert "timed out" in result[0].text.lower()
|
|
assert "test-uuid-1234" in result[0].text
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_empty_prompt(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""Empty prompt → workflow has empty text in positive node, but generation succeeds."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-empty-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_empty = {
|
|
"test-empty-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-empty-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_empty)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(prompt="", output_dir=str(tmp_path))
|
|
|
|
assert len(result) == 2
|
|
text_content = result[0]
|
|
image_content = result[1]
|
|
assert "Generated:" in text_content.text
|
|
assert str(tmp_path) in text_content.text
|
|
# Verify workflow was built with empty prompt (indirectly via success)
|
|
assert image_content.mimeType == "image/png"
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_long_prompt(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""Very long prompt → passed as-is to workflow without truncation."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
long_prompt = "a " + "very long descriptive prompt " * 50 # ~500 chars
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-long-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_long = {
|
|
"test-long-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-long-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_long)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(prompt=long_prompt, output_dir=str(tmp_path))
|
|
|
|
assert len(result) == 2
|
|
# Success implies long prompt was accepted (ComfyUI handles it)
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 1
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_invalid_model(tmp_path, monkeypatch):
|
|
"""Invalid model → ComfyUI /prompt returns 500 or 404, tool returns error TextContent."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(404, json={"error": "Model not found"})
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a cat",
|
|
model="nonexistent-model.safetensors",
|
|
output_dir=str(tmp_path)
|
|
)
|
|
|
|
assert len(result) == 1
|
|
assert "404" in result[0].text
|
|
assert "Model not found" in result[0].text
|
|
# No file saved
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 0
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_custom_output_dir(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""Custom output_dir → image saved there, path reflects it."""
|
|
custom_dir = tmp_path / "custom"
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path)) # Base for default
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-custom-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_custom = {
|
|
"test-custom-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-custom-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_custom)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a dog",
|
|
output_dir=str(custom_dir),
|
|
)
|
|
|
|
assert len(result) == 2
|
|
text_content = result[0]
|
|
assert str(custom_dir) in text_content.text
|
|
# Directory was created
|
|
assert custom_dir.exists()
|
|
saved_files = list(custom_dir.glob("*.png"))
|
|
assert len(saved_files) == 1
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_random_seed_variance(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""seed=-1 → different actual_seed each call, reflected in filename."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
# First generation
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "seed1-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_seed1 = {
|
|
"seed1-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/seed1-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_seed1)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result1 = await generate_image(prompt="cat", seed=-1, output_dir=str(tmp_path))
|
|
seed1 = [line for line in result1[0].text.split("\n") if "Seed:" in line][0].split(": ")[1]
|
|
filename1 = Path(result1[0].text.split("Generated: ")[1].split("\n")[0]).name
|
|
assert "Seed:" in result1[0].text
|
|
assert int(seed1) != 0 # Not default
|
|
|
|
# Reset mocks for second call
|
|
respx.reset()
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "seed2-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_seed2 = {
|
|
"seed2-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00002_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/seed2-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_seed2)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result2 = await generate_image(prompt="cat", seed=-1, output_dir=str(tmp_path))
|
|
seed2 = [line for line in result2[0].text.split("\n") if "Seed:" in line][0].split(": ")[1]
|
|
filename2 = Path(result2[0].text.split("Generated: ")[1].split("\n")[0]).name
|
|
|
|
# Different seeds and filenames
|
|
assert seed1 != seed2
|
|
assert filename1 != filename2
|
|
# Both saved
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 2
|