155d56e8e8
- Move bigmind/ -> mcp/bigmind/ - Move webscraper/ -> mcp/webscraper/ - Move mss-failsafe/ -> java/mss-failsafe/ - Move Wellmann-Shop/ -> java/wellmann-shop/ (normalize to kebab-case) - Add .roo/ IDE config files to tracking - Add plans/REPO_STRATEGY.md (monorepo strategy document) - Expand .gitignore: Java/Maven, Node/TS, coverage, uv.lock - Rewrite README.md as navigation index - Update .roo/mcp.json webscraper path to mcp/webscraper/
228 lines
8.2 KiB
Python
228 lines
8.2 KiB
Python
"""Tests for database initialisation."""
|
|
import sqlite3
|
|
import pytest
|
|
from bigmind.db import get_db_path, get_connection, init_db, _migrate_v1_to_v2, _migrate_v2_to_v3, _migrate_v3_to_v4, _migrate_v4_to_v5
|
|
|
|
|
|
class TestDbInit:
|
|
def test_db_file_created(self, temp_db):
|
|
assert temp_db.exists()
|
|
|
|
def test_schema_version_is_7(self, temp_db):
|
|
conn = get_connection()
|
|
row = conn.execute("SELECT version FROM schema_version").fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
assert row["version"] == 7
|
|
|
|
def test_all_tables_exist(self, temp_db):
|
|
expected = {
|
|
"users", "identity_profile", "sessions",
|
|
"session_summaries", "conversation_chunks", "facts",
|
|
"global_knowledge", "hypotheses", "upgrade_requests",
|
|
}
|
|
conn = get_connection()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()
|
|
conn.close()
|
|
found = {r["name"] for r in rows}
|
|
assert expected.issubset(found)
|
|
|
|
def test_facts_fts_table_exists(self, temp_db):
|
|
conn = get_connection()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='facts_fts'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_facts_has_deprecated_columns(self, temp_db):
|
|
conn = get_connection()
|
|
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
|
|
conn.close()
|
|
col_names = {r["name"] for r in rows}
|
|
assert "deprecated" in col_names
|
|
assert "deprecation_reason" in col_names
|
|
|
|
def test_hypotheses_has_correct_columns(self, temp_db):
|
|
conn = get_connection()
|
|
rows = conn.execute("PRAGMA table_info(hypotheses)").fetchall()
|
|
conn.close()
|
|
col_names = {r["name"] for r in rows}
|
|
for col in ("id", "session_id", "user_id", "hypothesis",
|
|
"confidence", "status", "resolution",
|
|
"created_at", "resolved_at"):
|
|
assert col in col_names, f"Missing column: {col}"
|
|
|
|
def test_init_is_idempotent(self, temp_db):
|
|
"""Calling init_db() twice must not raise."""
|
|
init_db()
|
|
init_db()
|
|
|
|
def test_db_path_override_via_env(self, temp_db):
|
|
assert str(get_db_path()) == str(temp_db)
|
|
|
|
|
|
class TestMigrationV1ToV2:
|
|
def test_migration_adds_columns_to_existing_table(self, temp_db):
|
|
"""Simulate a v1 DB: facts table without deprecated columns."""
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS facts")
|
|
conn.execute("""CREATE TABLE facts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id TEXT NOT NULL,
|
|
category TEXT NOT NULL,
|
|
fact TEXT NOT NULL,
|
|
source_session TEXT,
|
|
confidence REAL DEFAULT 1.0,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)""")
|
|
conn.commit()
|
|
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
|
|
assert "deprecated" not in {r["name"] for r in rows}
|
|
_migrate_v1_to_v2(conn)
|
|
conn.commit()
|
|
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
|
|
conn.close()
|
|
col_names = {r["name"] for r in rows}
|
|
assert "deprecated" in col_names
|
|
assert "deprecation_reason" in col_names
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
"""Running v1→v2 twice must not raise."""
|
|
conn = get_connection()
|
|
_migrate_v1_to_v2(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
class TestMigrationV4ToV5:
|
|
def test_migration_creates_facts_fts(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS facts_fts")
|
|
conn.commit()
|
|
_migrate_v4_to_v5(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='facts_fts'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_backfills_existing_facts(self, temp_db):
|
|
from bigmind import memory_store
|
|
user = memory_store.get_or_create_user("backfill_user")
|
|
# Insert a fact directly (bypassing FTS sync) to simulate pre-v5 data
|
|
conn = get_connection()
|
|
conn.execute(
|
|
"INSERT INTO facts (user_id, category, fact) VALUES (?,?,?)",
|
|
(user["id"], "codebase", "pre-migration fact about SQLite")
|
|
)
|
|
conn.execute("DROP TABLE IF EXISTS facts_fts")
|
|
conn.commit()
|
|
_migrate_v4_to_v5(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
results = memory_store.search_facts(user["id"], "SQLite")
|
|
assert len(results) == 1
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
conn = get_connection()
|
|
_migrate_v4_to_v5(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
class TestMigrationV3ToV4:
|
|
def test_migration_creates_upgrade_requests_table(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS upgrade_requests")
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='upgrade_requests'"
|
|
).fetchall()
|
|
assert len(rows) == 0
|
|
_migrate_v3_to_v4(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='upgrade_requests'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_creates_index(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP INDEX IF EXISTS idx_upgrade_requests_user_status")
|
|
conn.execute("DROP TABLE IF EXISTS upgrade_requests")
|
|
conn.commit()
|
|
_migrate_v3_to_v4(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' "
|
|
"AND name='idx_upgrade_requests_user_status'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
conn = get_connection()
|
|
_migrate_v3_to_v4(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def test_migration_creates_hypotheses_table(self, temp_db):
|
|
"""Drop hypotheses and re-run migration — table must reappear."""
|
|
conn = get_connection()
|
|
conn.execute("DROP TABLE IF EXISTS hypotheses")
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='hypotheses'"
|
|
).fetchall()
|
|
assert len(rows) == 0
|
|
_migrate_v2_to_v3(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='hypotheses'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_creates_index(self, temp_db):
|
|
conn = get_connection()
|
|
conn.execute("DROP INDEX IF EXISTS idx_hypotheses_user_status")
|
|
conn.execute("DROP TABLE IF EXISTS hypotheses")
|
|
conn.commit()
|
|
_migrate_v2_to_v3(conn)
|
|
conn.commit()
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_hypotheses_user_status'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
|
|
def test_migration_is_idempotent(self, temp_db):
|
|
"""Running v2→v3 twice must not raise."""
|
|
conn = get_connection()
|
|
_migrate_v2_to_v3(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def test_status_check_constraint_enforced(self, temp_db):
|
|
"""Inserting a bad status value must raise an integrity error."""
|
|
conn = get_connection()
|
|
user_row = conn.execute("SELECT id FROM users LIMIT 1").fetchone()
|
|
if not user_row:
|
|
conn.execute("INSERT INTO users (id, username) VALUES ('u1', 'testuser')")
|
|
conn.commit()
|
|
with pytest.raises(sqlite3.IntegrityError):
|
|
conn.execute(
|
|
"""INSERT INTO hypotheses (user_id, hypothesis, status)
|
|
VALUES ('u1', 'bad status test', 'invalid_status')"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|