67b8b44408
- web.py: add /profile-image route (serves most-recent gallery PNG)
add /gallery/image/<filename> route (per-image serving)
add /gallery route (renders gallery page from DB)
add _get_profile_image_path() helper
- web_render.py: replace emoji avatar with <img src=/profile-image>
onerror fallback to 🧠 emoji
add .nav bar with Profile/Gallery links to both pages
add _render_gallery_html() full gallery page renderer
add gallery CSS: .gal-grid, .gal-card, .gal-img, .gal-info, etc.
- db.py: bump SCHEMA_VERSION 7→8
add gallery_images table (id, filename, prompt, tags, model,
created_at, width, height, file_size_bytes)
add _migrate_v7_to_v8() migration function
add init_db() hook for v<8 migration
- tests: update test_schema_version_is_7→8 in test_db.py and
test_feature7_live_sessions.py; add gallery_images to expected tables
Storage strategy: Option B (filesystem + DB metadata)
Images in ~/.mcp/bigmind/gallery/, metadata in SQLite
Pre-populated with 5 lumen_profiles images (seeds 2409122067,
764633840, 1367851518, 3135233944, 568659042)
Tests: 297/297 passing
229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
"""Tests for database initialisation."""
|
|
import sqlite3
|
|
import pytest
|
|
from bigmind.db import get_db_path, get_connection, init_db, _migrate_v1_to_v2, _migrate_v2_to_v3, _migrate_v3_to_v4, _migrate_v4_to_v5
|
|
|
|
|
|
class TestDbInit:
|
|
def test_db_file_created(self, temp_db):
|
|
assert temp_db.exists()
|
|
|
|
def test_schema_version_is_8(self, temp_db):
|
|
conn = get_connection()
|
|
row = conn.execute("SELECT version FROM schema_version").fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
assert row["version"] == 8
|
|
|
|
def test_all_tables_exist(self, temp_db):
|
|
expected = {
|
|
"users", "identity_profile", "sessions",
|
|
"session_summaries", "conversation_chunks", "facts",
|
|
"global_knowledge", "hypotheses", "upgrade_requests",
|
|
"gallery_images",
|
|
}
|
|
conn = get_connection()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()
|
|
conn.close()
|
|
found = {r["name"] for r in rows}
|
|
assert expected.issubset(found)
|
|
|
|
def test_facts_fts_table_exists(self, temp_db):
|
|
conn = get_connection()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='facts_fts'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_facts_has_deprecated_columns(self, temp_db):
|
|
conn = get_connection()
|
|
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
|
|
conn.close()
|
|
col_names = {r["name"] for r in rows}
|
|
assert "deprecated" in col_names
|
|
assert "deprecation_reason" in col_names
|
|
|
|
def test_hypotheses_has_correct_columns(self, temp_db):
|
|
conn = get_connection()
|
|
rows = conn.execute("PRAGMA table_info(hypotheses)").fetchall()
|
|
conn.close()
|
|
col_names = {r["name"] for r in rows}
|
|
for col in ("id", "session_id", "user_id", "hypothesis",
|
|
"confidence", "status", "resolution",
|
|
"created_at", "resolved_at"):
|
|
assert col in col_names, f"Missing column: {col}"
|
|
|
|
def test_init_is_idempotent(self, temp_db):
|
|
"""Calling init_db() twice must not raise."""
|
|
init_db()
|
|
init_db()
|
|
|
|
def test_db_path_override_via_env(self, temp_db):
|
|
assert str(get_db_path()) == str(temp_db)
|
|
|
|
|
|
class TestMigrationV1ToV2:
|
|
def test_migration_adds_columns_to_existing_table(self, temp_db):
|
|
"""Simulate a v1 DB: facts table without deprecated columns."""
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS facts")
|
|
conn.execute("""CREATE TABLE facts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id TEXT NOT NULL,
|
|
category TEXT NOT NULL,
|
|
fact TEXT NOT NULL,
|
|
source_session TEXT,
|
|
confidence REAL DEFAULT 1.0,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)""")
|
|
conn.commit()
|
|
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
|
|
assert "deprecated" not in {r["name"] for r in rows}
|
|
_migrate_v1_to_v2(conn)
|
|
conn.commit()
|
|
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
|
|
conn.close()
|
|
col_names = {r["name"] for r in rows}
|
|
assert "deprecated" in col_names
|
|
assert "deprecation_reason" in col_names
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
"""Running v1→v2 twice must not raise."""
|
|
conn = get_connection()
|
|
_migrate_v1_to_v2(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
class TestMigrationV4ToV5:
|
|
def test_migration_creates_facts_fts(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS facts_fts")
|
|
conn.commit()
|
|
_migrate_v4_to_v5(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='facts_fts'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_backfills_existing_facts(self, temp_db):
|
|
from bigmind import memory_store
|
|
user = memory_store.get_or_create_user("backfill_user")
|
|
# Insert a fact directly (bypassing FTS sync) to simulate pre-v5 data
|
|
conn = get_connection()
|
|
conn.execute(
|
|
"INSERT INTO facts (user_id, category, fact) VALUES (?,?,?)",
|
|
(user["id"], "codebase", "pre-migration fact about SQLite")
|
|
)
|
|
conn.execute("DROP TABLE IF EXISTS facts_fts")
|
|
conn.commit()
|
|
_migrate_v4_to_v5(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
results = memory_store.search_facts(user["id"], "SQLite")
|
|
assert len(results) == 1
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
conn = get_connection()
|
|
_migrate_v4_to_v5(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
class TestMigrationV3ToV4:
|
|
def test_migration_creates_upgrade_requests_table(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS upgrade_requests")
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='upgrade_requests'"
|
|
).fetchall()
|
|
assert len(rows) == 0
|
|
_migrate_v3_to_v4(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='upgrade_requests'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_creates_index(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP INDEX IF EXISTS idx_upgrade_requests_user_status")
|
|
conn.execute("DROP TABLE IF EXISTS upgrade_requests")
|
|
conn.commit()
|
|
_migrate_v3_to_v4(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' "
|
|
"AND name='idx_upgrade_requests_user_status'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
conn = get_connection()
|
|
_migrate_v3_to_v4(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def test_migration_creates_hypotheses_table(self, temp_db):
|
|
"""Drop hypotheses and re-run migration — table must reappear."""
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS hypotheses")
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='hypotheses'"
|
|
).fetchall()
|
|
assert len(rows) == 0
|
|
_migrate_v2_to_v3(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='hypotheses'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_creates_index(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP INDEX IF EXISTS idx_hypotheses_user_status")
|
|
conn.execute("DROP TABLE IF EXISTS hypotheses")
|
|
conn.commit()
|
|
_migrate_v2_to_v3(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_hypotheses_user_status'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
"""Running v2→v3 twice must not raise."""
|
|
conn = get_connection()
|
|
_migrate_v2_to_v3(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def test_status_check_constraint_enforced(self, temp_db):
|
|
"""Inserting a bad status value must raise an integrity error."""
|
|
conn = get_connection()
|
|
user_row = conn.execute("SELECT id FROM users LIMIT 1").fetchone()
|
|
if not user_row:
|
|
conn.execute("INSERT INTO users (id, username) VALUES ('u1', 'testuser')")
|
|
conn.commit()
|
|
with pytest.raises(sqlite3.IntegrityError):
|
|
conn.execute(
|
|
"""INSERT INTO hypotheses (user_id, hypothesis, status)
|
|
VALUES ('u1', 'bad status test', 'invalid_status')"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|