1d8849cb41
- CLIPLoader clip_name: qwen_3_4b_klein.safetensors (from Comfy-Org/vae-text-encorder-for-flux-klein-4b) - VAE: flux2-vae.safetensors (321MB, same repo) - Live test confirmed: 2.1MB photorealistic 1024x1024 PNG in 52.43s on RX 7900 XTX - Test: assert clip_name == qwen_3_4b_klein.safetensors - 37/37 tests pass
920 lines
32 KiB
Python
920 lines
32 KiB
Python
"""Tests for mcp-image-gen server — all ComfyUI HTTP calls mocked via respx."""
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import httpx
|
|
import pytest
|
|
import respx
|
|
|
|
# Import the server module (sys.path set by conftest.py)
|
|
import server
|
|
from server import (
|
|
ComfyUIClient,
|
|
_build_filename,
|
|
_sanitize_name,
|
|
build_flux_workflow,
|
|
generate_image,
|
|
get_generation_status,
|
|
get_output_directory,
|
|
list_available_models,
|
|
)
|
|
|
|
COMFYUI_BASE = "http://test-comfyui:8188"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# build_flux_workflow — pure function, no mocking needed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_build_flux_workflow_structure():
|
|
"""Verify build_flux_workflow returns a dict with correct node types for default model."""
|
|
wf = build_flux_workflow(
|
|
prompt="a red cat",
|
|
neg_prompt="ugly",
|
|
width=512,
|
|
height=768,
|
|
steps=8,
|
|
seed=42,
|
|
model="flux1-schnell.safetensors",
|
|
)
|
|
assert wf["6"]["class_type"] == "CLIPTextEncode"
|
|
assert wf["8"]["class_type"] == "VAEDecode"
|
|
assert wf["9"]["class_type"] == "SaveImage"
|
|
assert wf["13"]["class_type"] == "KSampler"
|
|
assert wf["27"]["class_type"] == "EmptySD3LatentImage"
|
|
assert wf["30"]["class_type"] == "DualCLIPLoader"
|
|
assert wf["31"]["class_type"] == "VAELoader"
|
|
assert wf["32"]["class_type"] == "UNETLoader"
|
|
assert wf["33"]["class_type"] == "CLIPTextEncode"
|
|
|
|
|
|
def test_build_flux_workflow_heretic_model():
|
|
"""Verify FLUX.2 Klein 4B with Heretic Qwen3-4B encoder uses correct nodes."""
|
|
wf = build_flux_workflow(
|
|
prompt="a red cat",
|
|
neg_prompt="ugly",
|
|
width=1024,
|
|
height=1024,
|
|
steps=4,
|
|
seed=42,
|
|
model="flux-2-klein-4b.safetensors",
|
|
)
|
|
# New FLUX.2 workflow uses different node IDs and types
|
|
assert wf["1"]["class_type"] == "CLIPLoader" # Qwen3-4B uses single CLIPLoader
|
|
assert wf["1"]["inputs"]["type"] == "flux2" # correct type for FLUX.2
|
|
assert wf["1"]["inputs"]["device"] == "default" # required for FLUX.2 CLIPLoader
|
|
assert wf["1"]["inputs"]["clip_name"] == "qwen_3_4b_klein.safetensors" # Comfy-Org/vae-text-encorder-for-flux-klein-4b
|
|
assert wf["2"]["class_type"] == "CLIPTextEncode" # standard CLIP encode (not Flux-specific)
|
|
assert wf["4"]["class_type"] == "UNETLoader"
|
|
assert wf["4"]["inputs"]["unet_name"] == "flux-2-klein-4b.safetensors"
|
|
assert wf["4"]["inputs"]["weight_dtype"] == "default" # not fp8 — avoids dimension errors
|
|
assert wf["6"]["class_type"] == "EmptyFlux2LatentImage" # FLUX.2-specific latent
|
|
assert wf["8"]["class_type"] == "CFGGuider" # CFGGuider replaces FluxDisableGuidance+BasicGuider
|
|
assert wf["8"]["inputs"]["cfg"] == 5 # cfg=5 for FLUX.2 Klein
|
|
assert wf["11"]["class_type"] == "SamplerCustomAdvanced" # FLUX.2 sampler (node 11, not 12)
|
|
assert wf["13"]["class_type"] == "SaveImage" # output node
|
|
|
|
|
|
def test_workflow_registry_contains_both_models():
|
|
"""Verify the registry contains both supported models."""
|
|
assert "flux1-schnell.safetensors" in server._WORKFLOW_REGISTRY
|
|
assert "flux-2-klein-4b.safetensors" in server._WORKFLOW_REGISTRY
|
|
assert len(server._WORKFLOW_REGISTRY) == 2
|
|
|
|
|
|
def test_workflow_registry_fallback():
|
|
"""Unknown model falls back to default (FLUX.1-schnell)."""
|
|
wf = build_flux_workflow(
|
|
prompt="test",
|
|
neg_prompt="",
|
|
width=512,
|
|
height=512,
|
|
steps=4,
|
|
seed=42,
|
|
model="unknown-model.safetensors",
|
|
)
|
|
# Should have used default workflow (DualCLIPLoader)
|
|
assert wf["30"]["class_type"] == "DualCLIPLoader"
|
|
assert wf["32"]["inputs"]["unet_name"] == "unknown-model.safetensors"
|
|
|
|
|
|
def test_build_flux_workflow_params_injected():
|
|
"""Verify all parameters are injected into correct nodes."""
|
|
wf = build_flux_workflow(
|
|
prompt="a blue whale",
|
|
neg_prompt="cartoonish",
|
|
width=512,
|
|
height=768,
|
|
steps=8,
|
|
seed=12345,
|
|
model="sdxl.safetensors",
|
|
)
|
|
assert wf["6"]["inputs"]["text"] == "a blue whale"
|
|
assert wf["33"]["inputs"]["text"] == "cartoonish"
|
|
assert wf["27"]["inputs"]["width"] == 512
|
|
assert wf["27"]["inputs"]["height"] == 768
|
|
assert wf["13"]["inputs"]["steps"] == 8
|
|
assert wf["13"]["inputs"]["seed"] == 12345
|
|
assert wf["32"]["inputs"]["unet_name"] == "sdxl.safetensors"
|
|
|
|
|
|
def test_negative_prompt_included():
|
|
"""Verify negative prompt appears in workflow node 33 when provided."""
|
|
wf = build_flux_workflow(
|
|
prompt="forest",
|
|
neg_prompt="blurry, dark",
|
|
width=1024,
|
|
height=1024,
|
|
steps=4,
|
|
seed=1,
|
|
model="flux1-schnell.safetensors",
|
|
)
|
|
assert wf["33"]["inputs"]["text"] == "blurry, dark"
|
|
|
|
|
|
def test_random_seed_generated():
|
|
"""seed=-1 generates a random seed each call."""
|
|
wf1 = build_flux_workflow("cat", "", 512, 512, 4, -1, "flux1-schnell.safetensors")
|
|
wf2 = build_flux_workflow("cat", "", 512, 512, 4, -1, "flux1-schnell.safetensors")
|
|
seed1 = wf1["_meta"]["actual_seed"]
|
|
seed2 = wf2["_meta"]["actual_seed"]
|
|
# Both are valid integers
|
|
assert isinstance(seed1, int)
|
|
assert 0 <= seed1 < 2**32
|
|
# With overwhelming probability they differ
|
|
# (1/2^32 chance of collision — negligible for a test)
|
|
# We just verify _meta is populated
|
|
assert "_meta" in wf1
|
|
assert "_meta" in wf2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _sanitize_name — pure function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_sanitize_name_basic():
|
|
"""Simple alphanumeric name passes through unchanged."""
|
|
assert _sanitize_name("lumen_profile") == "lumen_profile"
|
|
|
|
|
|
def test_sanitize_name_spaces_to_underscores():
|
|
"""Spaces are converted to underscores."""
|
|
assert _sanitize_name("my cool image") == "my_cool_image"
|
|
|
|
|
|
def test_sanitize_name_special_chars_stripped():
|
|
"""Special characters (!, @, #, etc.) are stripped."""
|
|
result = _sanitize_name("hello! world@2024#")
|
|
assert "!" not in result
|
|
assert "@" not in result
|
|
assert "#" not in result
|
|
assert "hello" in result
|
|
assert "world" in result
|
|
|
|
|
|
def test_sanitize_name_empty_returns_empty():
|
|
"""Empty string or whitespace-only returns empty string."""
|
|
assert _sanitize_name("") == ""
|
|
assert _sanitize_name(" ") == ""
|
|
|
|
|
|
def test_sanitize_name_collapse_underscores():
|
|
"""Multiple consecutive underscores/hyphens are collapsed to one."""
|
|
result = _sanitize_name("lumen__profile")
|
|
assert "__" not in result
|
|
|
|
|
|
def test_sanitize_name_truncates_at_64():
|
|
"""Names longer than 64 chars are truncated."""
|
|
long_name = "a" * 100
|
|
result = _sanitize_name(long_name)
|
|
assert len(result) <= 64
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_filename — pure function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_build_filename_with_name():
|
|
"""When name is provided, filename includes it as prefix."""
|
|
filename = _build_filename("lumen", "20260406_120000", 12345)
|
|
assert filename == "lumen_20260406_120000_12345.png"
|
|
|
|
|
|
def test_build_filename_without_name():
|
|
"""When name is empty, filename is timestamp_seed.png."""
|
|
filename = _build_filename("", "20260406_120000", 12345)
|
|
assert filename == "20260406_120000_12345.png"
|
|
assert not filename.startswith("_")
|
|
|
|
|
|
def test_build_filename_sanitizes_name():
|
|
"""Name with spaces and special chars is sanitized before use in filename."""
|
|
filename = _build_filename("my image!", "20260406_120000", 99)
|
|
assert "!" not in filename
|
|
assert "my_image" in filename
|
|
assert filename.endswith("_20260406_120000_99.png")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_available_models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_list_available_models():
|
|
"""Mock /object_info, verify model list is returned."""
|
|
mock_response = {
|
|
"CheckpointLoaderSimple": {
|
|
"input": {
|
|
"required": {
|
|
"ckpt_name": [
|
|
["flux1-schnell.safetensors", "sdxl.safetensors"],
|
|
{},
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/object_info/CheckpointLoaderSimple").mock(
|
|
return_value=httpx.Response(200, json=mock_response)
|
|
)
|
|
|
|
result = await list_available_models()
|
|
assert "flux1-schnell.safetensors" in result
|
|
assert "sdxl.safetensors" in result
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_list_available_models_comfyui_offline():
|
|
"""When ComfyUI is unreachable, list_available_models falls back to registry models."""
|
|
respx.get(f"{COMFYUI_BASE}/object_info/CheckpointLoaderSimple").mock(
|
|
side_effect=httpx.ConnectError("connection refused")
|
|
)
|
|
|
|
result = await list_available_models()
|
|
# Should return registry models even when ComfyUI is offline
|
|
assert isinstance(result, list)
|
|
assert "flux1-schnell.safetensors" in result
|
|
assert "flux-2-klein-4b.safetensors" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_generation_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_get_generation_status_pending(queue_with_pending):
|
|
"""prompt_id in queue_pending → status is 'pending'."""
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_with_pending)
|
|
)
|
|
|
|
result = await get_generation_status("test-uuid-1234")
|
|
assert result["status"] == "pending"
|
|
assert result["prompt_id"] == "test-uuid-1234"
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_get_generation_status_running(queue_with_running):
|
|
"""prompt_id in queue_running → status is 'running'."""
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_with_running)
|
|
)
|
|
|
|
result = await get_generation_status("test-uuid-1234")
|
|
assert result["status"] == "running"
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_get_generation_status_complete(queue_empty, mock_history_response):
|
|
"""prompt_id not in queue + found in history → status is 'completed'."""
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-uuid-1234").mock(
|
|
return_value=httpx.Response(200, json=mock_history_response)
|
|
)
|
|
|
|
result = await get_generation_status("test-uuid-1234")
|
|
assert result["status"] == "completed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_output_directory
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_get_output_directory_default(monkeypatch):
|
|
"""No IMAGE_OUTPUT_DIR env var → returns expanded ~/Pictures/mcp-generated."""
|
|
monkeypatch.delenv("IMAGE_OUTPUT_DIR", raising=False)
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", "~/Pictures/mcp-generated")
|
|
|
|
result = get_output_directory()
|
|
assert result == str(Path("~/Pictures/mcp-generated").expanduser().resolve())
|
|
assert "~" not in result # expanded
|
|
|
|
|
|
def test_get_output_directory_custom(monkeypatch, tmp_path):
|
|
"""IMAGE_OUTPUT_DIR set → returns that path."""
|
|
custom = str(tmp_path / "custom-output")
|
|
monkeypatch.setenv("IMAGE_OUTPUT_DIR", custom)
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", custom)
|
|
|
|
result = get_output_directory()
|
|
assert result == str(Path(custom).expanduser().resolve())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_image — count/name validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_count_zero_returns_error():
|
|
"""count=0 → returns error TextContent without calling ComfyUI."""
|
|
result = await generate_image(prompt="a cat", count=0)
|
|
assert len(result) == 1
|
|
assert "count must be at least 1" in result[0].text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_count_exceeds_max_returns_error():
|
|
"""count=11 (> MAX_COUNT=10) → returns error TextContent without calling ComfyUI."""
|
|
result = await generate_image(prompt="a cat", count=11)
|
|
assert len(result) == 1
|
|
assert "at most 10" in result[0].text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_image — name parameter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_with_name(
|
|
tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch
|
|
):
|
|
"""name param → saved file has name as prefix."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "name-test-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_named = {
|
|
"name-test-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/name-test-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_named)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="lumen portrait",
|
|
name="lumen_profile",
|
|
output_dir=str(tmp_path),
|
|
)
|
|
|
|
assert len(result) == 2
|
|
saved_files = list(tmp_path.glob("lumen_profile_*.png"))
|
|
assert len(saved_files) == 1, f"Expected 1 file with 'lumen_profile_' prefix, got: {list(tmp_path.glob('*.png'))}"
|
|
# Path in TextContent also has the name prefix
|
|
assert "lumen_profile_" in result[0].text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_image — count=2 batch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_count_2(
|
|
tmp_path, sample_image_bytes, queue_empty, monkeypatch
|
|
):
|
|
"""count=2 → returns 4 content items (Text+Image per image), 2 files saved."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
# First image
|
|
mock_history_1 = {
|
|
"uuid-batch-1": {
|
|
"outputs": {"9": {"images": [{"filename": "img1.png", "subfolder": "", "type": "output"}]}},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
# Second image
|
|
mock_history_2 = {
|
|
"uuid-batch-2": {
|
|
"outputs": {"9": {"images": [{"filename": "img2.png", "subfolder": "", "type": "output"}]}},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
side_effect=[
|
|
httpx.Response(200, json={"prompt_id": "uuid-batch-1"}),
|
|
httpx.Response(200, json={"prompt_id": "uuid-batch-2"}),
|
|
]
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/history/uuid-batch-1").mock(
|
|
return_value=httpx.Response(200, json=mock_history_1)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/history/uuid-batch-2").mock(
|
|
return_value=httpx.Response(200, json=mock_history_2)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a landscape",
|
|
count=2,
|
|
seed=100,
|
|
output_dir=str(tmp_path),
|
|
)
|
|
|
|
# 4 content items: [Text1, Image1, Text2, Image2]
|
|
assert len(result) == 4
|
|
assert result[0].type == "text"
|
|
assert result[1].type == "image"
|
|
assert result[2].type == "text"
|
|
assert result[3].type == "image"
|
|
|
|
# 2 files saved
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 2
|
|
|
|
# Label contains batch index
|
|
assert "1/2" in result[0].text
|
|
assert "2/2" in result[2].text
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_count_2_fixed_seed_increments(
|
|
tmp_path, sample_image_bytes, queue_empty, monkeypatch
|
|
):
|
|
"""count=2 with fixed seed → seeds are incremented (seed, seed+1)."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
submitted_seeds = []
|
|
|
|
def capture_prompt(request):
|
|
body = json.loads(request.content)
|
|
seed_val = body["prompt"]["13"]["inputs"]["seed"]
|
|
submitted_seeds.append(seed_val)
|
|
idx = len(submitted_seeds)
|
|
return httpx.Response(200, json={"prompt_id": f"seed-test-{idx}"})
|
|
|
|
mock_history_1 = {
|
|
"seed-test-1": {
|
|
"outputs": {"9": {"images": [{"filename": "img1.png", "subfolder": "", "type": "output"}]}},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
mock_history_2 = {
|
|
"seed-test-2": {
|
|
"outputs": {"9": {"images": [{"filename": "img2.png", "subfolder": "", "type": "output"}]}},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(side_effect=capture_prompt)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/history/seed-test-1").mock(
|
|
return_value=httpx.Response(200, json=mock_history_1)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/history/seed-test-2").mock(
|
|
return_value=httpx.Response(200, json=mock_history_2)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
await generate_image(
|
|
prompt="a test",
|
|
count=2,
|
|
seed=42,
|
|
output_dir=str(tmp_path),
|
|
)
|
|
|
|
assert submitted_seeds == [42, 43], f"Expected [42, 43], got {submitted_seeds}"
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_count_partial_failure_continues(
|
|
tmp_path, sample_image_bytes, queue_empty, monkeypatch
|
|
):
|
|
"""count=2 where first image fails → error in slot 1, second image succeeds in slot 2."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
mock_history_2 = {
|
|
"uuid-ok": {
|
|
"outputs": {"9": {"images": [{"filename": "img2.png", "subfolder": "", "type": "output"}]}},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
side_effect=[
|
|
httpx.Response(500, json={"error": "GPU OOM"}), # first fails
|
|
httpx.Response(200, json={"prompt_id": "uuid-ok"}), # second succeeds
|
|
]
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/history/uuid-ok").mock(
|
|
return_value=httpx.Response(200, json=mock_history_2)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a test",
|
|
count=2,
|
|
seed=10,
|
|
output_dir=str(tmp_path),
|
|
)
|
|
|
|
# First: error TextContent only (no ImageContent)
|
|
# Second: [TextContent, ImageContent]
|
|
assert len(result) == 3
|
|
assert result[0].type == "text"
|
|
assert "500" in result[0].text or "error" in result[0].text.lower()
|
|
assert result[1].type == "text"
|
|
assert result[2].type == "image"
|
|
|
|
# Only 1 file saved (the successful one)
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_image — existing tests (kept intact)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_success(
|
|
tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch
|
|
):
|
|
"""Mock full lifecycle: queue → poll done → history → view. Verify outputs."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
# 1. POST /api/prompt → prompt_id
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-uuid-1234"})
|
|
)
|
|
# 2. GET /api/queue → empty (job done immediately)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
# 3. GET /api/history/test-uuid-1234
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-uuid-1234").mock(
|
|
return_value=httpx.Response(200, json=mock_history_response)
|
|
)
|
|
# 4. GET /api/view → image bytes
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a red cat",
|
|
output_dir=str(tmp_path),
|
|
)
|
|
|
|
# Should return [TextContent, ImageContent]
|
|
assert len(result) == 2
|
|
text_content = result[0]
|
|
image_content = result[1]
|
|
|
|
# TextContent has path info
|
|
assert "Generated:" in text_content.text
|
|
assert str(tmp_path) in text_content.text
|
|
|
|
# ImageContent has valid base64 PNG
|
|
assert image_content.type == "image"
|
|
assert image_content.mimeType == "image/png"
|
|
decoded = base64.b64decode(image_content.data)
|
|
assert decoded[:8] == b"\x89PNG\r\n\x1a\n" # PNG magic bytes
|
|
|
|
# File was actually saved
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 1
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_comfyui_unavailable():
|
|
"""ComfyUI unreachable → returns graceful error message as single TextContent."""
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
side_effect=httpx.ConnectError("connection refused")
|
|
)
|
|
|
|
result = await generate_image(prompt="a cat")
|
|
|
|
assert len(result) == 1
|
|
assert "not reachable" in result[0].text.lower()
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_timeout(monkeypatch, queue_with_pending):
|
|
"""Poll loop never completes within timeout → returns timeout error."""
|
|
monkeypatch.setattr(server, "COMFYUI_TIMEOUT", 0) # instant timeout
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-uuid-1234"})
|
|
)
|
|
# Queue always shows job pending → never finishes
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_with_pending)
|
|
)
|
|
|
|
result = await generate_image(prompt="slow image")
|
|
|
|
assert len(result) == 1
|
|
assert "timed out" in result[0].text.lower()
|
|
assert "test-uuid-1234" in result[0].text
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_empty_prompt(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""Empty prompt → workflow has empty text in positive node, but generation succeeds."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-empty-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_empty = {
|
|
"test-empty-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-empty-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_empty)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(prompt="", output_dir=str(tmp_path))
|
|
|
|
assert len(result) == 2
|
|
text_content = result[0]
|
|
image_content = result[1]
|
|
assert "Generated:" in text_content.text
|
|
assert str(tmp_path) in text_content.text
|
|
# Verify workflow was built with empty prompt (indirectly via success)
|
|
assert image_content.mimeType == "image/png"
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_long_prompt(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""Very long prompt → passed as-is to workflow without truncation."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
long_prompt = "a " + "very long descriptive prompt " * 50 # ~500 chars
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-long-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_long = {
|
|
"test-long-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-long-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_long)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(prompt=long_prompt, output_dir=str(tmp_path))
|
|
|
|
assert len(result) == 2
|
|
# Success implies long prompt was accepted (ComfyUI handles it)
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 1
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_invalid_model(tmp_path, monkeypatch):
|
|
"""Invalid model → ComfyUI /prompt returns 500 or 404, tool returns error TextContent."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(404, json={"error": "Model not found"})
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a cat",
|
|
model="nonexistent-model.safetensors",
|
|
output_dir=str(tmp_path)
|
|
)
|
|
|
|
assert len(result) == 1
|
|
assert "404" in result[0].text
|
|
assert "Model not found" in result[0].text
|
|
# No file saved
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 0
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_custom_output_dir(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""Custom output_dir → image saved there, path reflects it."""
|
|
custom_dir = tmp_path / "custom"
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path)) # Base for default
|
|
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "test-custom-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_custom = {
|
|
"test-custom-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/test-custom-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_custom)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result = await generate_image(
|
|
prompt="a dog",
|
|
output_dir=str(custom_dir),
|
|
)
|
|
|
|
assert len(result) == 2
|
|
text_content = result[0]
|
|
assert str(custom_dir) in text_content.text
|
|
# Directory was created
|
|
assert custom_dir.exists()
|
|
saved_files = list(custom_dir.glob("*.png"))
|
|
assert len(saved_files) == 1
|
|
|
|
|
|
@respx.mock
|
|
@pytest.mark.asyncio
|
|
async def test_generate_image_random_seed_variance(tmp_path, sample_image_bytes, mock_history_response, queue_empty, monkeypatch):
|
|
"""seed=-1 → different actual_seed each call, reflected in filename."""
|
|
monkeypatch.setattr(server, "IMAGE_OUTPUT_DIR", str(tmp_path))
|
|
|
|
# First generation
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "seed1-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_seed1 = {
|
|
"seed1-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00001_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/seed1-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_seed1)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result1 = await generate_image(prompt="cat", seed=-1, output_dir=str(tmp_path))
|
|
seed1 = [line for line in result1[0].text.split("\n") if "Seed:" in line][0].split(": ")[1]
|
|
filename1 = Path(result1[0].text.split("Generated: ")[1].split("\n")[0]).name
|
|
assert "Seed:" in result1[0].text
|
|
assert int(seed1) != 0 # Not default
|
|
|
|
# Reset mocks for second call
|
|
respx.reset()
|
|
respx.post(f"{COMFYUI_BASE}/api/prompt").mock(
|
|
return_value=httpx.Response(200, json={"prompt_id": "seed2-uuid"})
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/queue").mock(
|
|
return_value=httpx.Response(200, json=queue_empty)
|
|
)
|
|
mock_history_seed2 = {
|
|
"seed2-uuid": {
|
|
"outputs": {
|
|
"9": {
|
|
"images": [
|
|
{
|
|
"filename": "mcp-image-gen_00002_.png",
|
|
"subfolder": "",
|
|
"type": "output",
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"status": {"completed": True},
|
|
}
|
|
}
|
|
respx.get(f"{COMFYUI_BASE}/api/history/seed2-uuid").mock(
|
|
return_value=httpx.Response(200, json=mock_history_seed2)
|
|
)
|
|
respx.get(f"{COMFYUI_BASE}/api/view").mock(
|
|
return_value=httpx.Response(200, content=sample_image_bytes)
|
|
)
|
|
|
|
result2 = await generate_image(prompt="cat", seed=-1, output_dir=str(tmp_path))
|
|
seed2 = [line for line in result2[0].text.split("\n") if "Seed:" in line][0].split(": ")[1]
|
|
filename2 = Path(result2[0].text.split("Generated: ")[1].split("\n")[0]).name
|
|
|
|
# Different seeds and filenames
|
|
assert seed1 != seed2
|
|
assert filename1 != filename2
|
|
# Both saved
|
|
saved_files = list(tmp_path.glob("*.png"))
|
|
assert len(saved_files) == 2
|