Files
pi_mcps/mcp/bigmind/tests/test_memory_store.py
pplate 42ffc85f0b fix(bigmind): apply 4 health-check fixes — BUG-1/2/3 + PERF-1
BUG-1: fix test_server_tools.py assert "ALWAYS" → "Always" (case mismatch)
BUG-2: export_memory() now includes hypotheses, upgrade_requests, token_saves,
        people tables; renamed bigmind_version → bigmind_schema_version (int)
BUG-3: auto_close.py replaced CURRENT_TIMESTAMP (SQLite) with Python
        datetime.now(timezone.utc).isoformat() for consistent UTC timestamps
PERF-1: context_builder.py caps get_facts() at _MAX_CONTEXT_FACTS=50 with
         overflow hint to prevent unbounded context growth

All 297 tests passing. Upgrade requests #6-9 resolved.
Health report: plans/BIGMIND_HEALTH_REPORT_2026-04-04.md
2026-04-04 09:49:13 +02:00

758 lines
36 KiB
Python

"""Tests for memory_store CRUD operations."""
import json
import pytest
from datetime import datetime, timezone, timedelta
from pathlib import Path
from bigmind import memory_store
from bigmind.db import db
@pytest.fixture
def user():
return memory_store.get_or_create_user("testuser", "Test User")
class TestUsers:
def test_create_user(self, temp_db):
u = memory_store.get_or_create_user("alice")
assert u["username"] == "alice"
assert u["id"]
def test_get_existing_user(self, temp_db):
u1 = memory_store.get_or_create_user("bob")
u2 = memory_store.get_or_create_user("bob")
assert u1["id"] == u2["id"]
def test_get_current_username_from_env(self, monkeypatch):
monkeypatch.setenv("BIGMIND_USER", "envuser")
assert memory_store.get_current_username() == "envuser"
class TestIdentityProfile:
def test_upsert_creates_profile(self, temp_db, user):
profile = memory_store.upsert_identity_profile(
user["id"], role="Engineer", preferences="Python first"
)
assert profile["role"] == "Engineer"
assert profile["preferences"] == "Python first"
def test_upsert_partial_update_preserves_other_fields(self, temp_db, user):
memory_store.upsert_identity_profile(
user["id"], role="Eng", preferences="Python", pinned_facts="- fact 1"
)
# Only update role — preferences and pinned_facts should be preserved
memory_store.upsert_identity_profile(user["id"], role="Senior Eng")
profile = memory_store.get_identity_profile(user["id"])
assert profile["role"] == "Senior Eng"
assert profile["preferences"] == "Python"
assert "fact 1" in profile["pinned_facts"]
def test_get_missing_profile_returns_none(self, temp_db):
assert memory_store.get_identity_profile("nonexistent-id") is None
class TestSessions:
def test_create_session_returns_id(self, temp_db, user):
sid = memory_store.create_session(user["id"])
assert sid and len(sid) == 36 # UUID format
def test_close_session(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Built a thing", topics="mcp", outcome="Done")
sessions = memory_store.get_recent_sessions(user["id"])
assert len(sessions) == 1
assert sessions[0]["one_liner"] == "Built a thing"
assert sessions[0]["topics"] == "mcp"
def test_one_liner_truncated_at_120_chars(self, temp_db, user):
sid = memory_store.create_session(user["id"])
long_title = "x" * 200
memory_store.close_session(sid, long_title)
sessions = memory_store.get_recent_sessions(user["id"])
assert len(sessions[0]["one_liner"]) == 120
def test_open_session_not_in_recent(self, temp_db, user):
memory_store.create_session(user["id"])
# Open sessions (no ended_at) must NOT appear in the recent list
assert memory_store.get_recent_sessions(user["id"]) == []
def test_save_session_summary(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Headline")
memory_store.save_session_summary(sid, "Detailed summary", key_facts="- fact")
detail = memory_store.get_session_detail(sid)
assert detail["summary"] == "Detailed summary"
assert detail["key_facts"] == "- fact"
# has_tier2 flag must be set
sessions = memory_store.get_recent_sessions(user["id"])
assert sessions[0]["has_tier2"] == 1
def test_get_open_sessions(self, temp_db, user):
sid = memory_store.create_session(user["id"])
open_s = memory_store.get_open_sessions(user["id"])
assert any(s["id"] == sid for s in open_s)
class TestChunks:
def test_append_and_search_chunks(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(
sid, user["id"], "assistant",
"We decided to use SQLite for the database",
"architectural decision"
)
results = memory_store.search_chunks(user["id"], "SQLite")
assert len(results) == 1
assert "SQLite" in results[0]["content"]
def test_chunk_seq_increments(self, temp_db, user):
sid = memory_store.create_session(user["id"])
id1 = memory_store.append_chunk(sid, user["id"], "user", "first")
id2 = memory_store.append_chunk(sid, user["id"], "assistant", "second")
assert id2 > id1
def test_search_isolated_by_user(self, temp_db):
u1 = memory_store.get_or_create_user("user_a")
u2 = memory_store.get_or_create_user("user_b")
sid1 = memory_store.create_session(u1["id"])
memory_store.append_chunk(sid1, u1["id"], "user", "secret data")
# u2 should not see u1's chunks
assert memory_store.search_chunks(u2["id"], "secret") == []
class TestFacts:
def test_store_and_retrieve_fact(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "preference", "Prefers dark mode")
facts = memory_store.get_facts(user["id"])
assert any(f["id"] == fid for f in facts)
def test_filter_by_category(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Python first")
memory_store.store_fact(user["id"], "decision", "Use SQLite")
prefs = memory_store.get_facts(user["id"], category="preference")
assert all(f["category"] == "preference" for f in prefs)
class TestFactDeprecation:
def test_deprecate_returns_true_on_success(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Old stack note")
result = memory_store.deprecate_fact(fid, user["id"], "No longer applicable")
assert result is True
def test_deprecate_hides_fact_from_get_facts(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Deprecated technology")
memory_store.deprecate_fact(fid, user["id"])
facts = memory_store.get_facts(user["id"])
assert not any(f["id"] == fid for f in facts)
def test_deprecated_fact_visible_with_include_deprecated(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Soft-deleted fact")
memory_store.deprecate_fact(fid, user["id"], "outdated")
all_facts = memory_store.get_facts(user["id"], include_deprecated=True)
assert any(f["id"] == fid for f in all_facts)
def test_deprecation_reason_stored(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "decision", "Use Gradle")
memory_store.deprecate_fact(fid, user["id"], "Switched to Maven")
all_facts = memory_store.get_facts(user["id"], include_deprecated=True)
deprecated = next(f for f in all_facts if f["id"] == fid)
assert deprecated["deprecation_reason"] == "Switched to Maven"
assert deprecated["deprecated"] == 1
def test_deprecate_no_reason_still_works(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "preference", "Some preference")
result = memory_store.deprecate_fact(fid, user["id"])
assert result is True
facts = memory_store.get_facts(user["id"])
assert not any(f["id"] == fid for f in facts)
def test_deprecate_unknown_id_returns_false(self, temp_db, user):
result = memory_store.deprecate_fact(99999, user["id"])
assert result is False
def test_deprecate_wrong_user_returns_false(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
fid = memory_store.store_fact(user_a["id"], "codebase", "User A's fact")
result = memory_store.deprecate_fact(fid, user_b["id"], "Should not work")
assert result is False
# Fact must still be visible for user_a
facts = memory_store.get_facts(user_a["id"])
assert any(f["id"] == fid for f in facts)
def test_non_deprecated_facts_still_returned_by_default(self, temp_db, user):
fid_keep = memory_store.store_fact(user["id"], "preference", "Keep this one")
fid_drop = memory_store.store_fact(user["id"], "preference", "Drop this one")
memory_store.deprecate_fact(fid_drop, user["id"])
facts = memory_store.get_facts(user["id"])
ids = [f["id"] for f in facts]
assert fid_keep in ids
assert fid_drop not in ids
class TestStats:
def test_stats_returns_expected_keys(self, temp_db, user):
stats = memory_store.get_stats(user["id"])
for key in ("sessions", "facts", "chunks", "db_size_kb", "db_path"):
assert key in stats
class TestHypotheses:
def test_add_hypothesis_returns_id(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "I believe X because Y")
assert isinstance(hid, int)
assert hid > 0
def test_add_hypothesis_default_confidence(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Default confidence belief")
results = memory_store.list_hypotheses(user["id"])
assert results[0]["confidence"] == 0.7
def test_add_hypothesis_custom_confidence(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_hypothesis(user["id"], sid, "Strong belief", confidence=0.95)
results = memory_store.list_hypotheses(user["id"])
assert results[0]["confidence"] == 0.95
def test_new_hypothesis_status_is_open(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_hypothesis(user["id"], sid, "Fresh thought")
results = memory_store.list_hypotheses(user["id"])
assert results[0]["status"] == "open"
def test_list_hypotheses_returns_all(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_hypothesis(user["id"], sid, "Thought one")
memory_store.add_hypothesis(user["id"], sid, "Thought two")
results = memory_store.list_hypotheses(user["id"])
assert len(results) == 2
def test_list_hypotheses_filter_by_status(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Will be confirmed")
memory_store.add_hypothesis(user["id"], sid, "Stays open")
memory_store.resolve_hypothesis(hid, user["id"], "confirmed", "It was true")
open_list = memory_store.list_hypotheses(user["id"], status="open")
confirmed_list = memory_store.list_hypotheses(user["id"], status="confirmed")
assert len(open_list) == 1
assert open_list[0]["hypothesis"] == "Stays open"
assert len(confirmed_list) == 1
assert confirmed_list[0]["hypothesis"] == "Will be confirmed"
def test_resolve_confirmed(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Bug is in serializer")
result = memory_store.resolve_hypothesis(
hid, user["id"], "confirmed", "Confirmed — the serializer had a null check missing"
)
assert result is True
resolved = memory_store.list_hypotheses(user["id"], status="confirmed")
assert len(resolved) == 1
assert resolved[0]["resolution"] == "Confirmed — the serializer had a null check missing"
assert resolved[0]["resolved_at"] is not None
def test_resolve_refuted(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Problem is in the network")
memory_store.resolve_hypothesis(hid, user["id"], "refuted", "Was actually a race condition")
results = memory_store.list_hypotheses(user["id"], status="refuted")
assert results[0]["status"] == "refuted"
def test_resolve_abandoned(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Might be a cache issue")
memory_store.resolve_hypothesis(hid, user["id"], "abandoned")
results = memory_store.list_hypotheses(user["id"], status="abandoned")
assert len(results) == 1
def test_resolve_invalid_status_raises(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Some thought")
with pytest.raises(ValueError, match="Invalid status"):
memory_store.resolve_hypothesis(hid, user["id"], "wrong_status")
def test_resolve_unknown_id_returns_false(self, temp_db, user):
result = memory_store.resolve_hypothesis(99999, user["id"], "confirmed")
assert result is False
def test_resolve_wrong_user_returns_false(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid = memory_store.create_session(user_a["id"])
hid = memory_store.add_hypothesis(user_a["id"], sid, "User A's private thought")
result = memory_store.resolve_hypothesis(hid, user_b["id"], "confirmed")
assert result is False
# Hypothesis must still be open for user_a
results = memory_store.list_hypotheses(user_a["id"], status="open")
assert len(results) == 1
def test_list_isolated_by_user(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid_a = memory_store.create_session(user_a["id"])
memory_store.add_hypothesis(user_a["id"], sid_a, "User A only sees this")
assert memory_store.list_hypotheses(user_b["id"]) == []
def test_resolve_no_resolution_text_is_allowed(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Quick thought")
result = memory_store.resolve_hypothesis(hid, user["id"], "abandoned")
assert result is True
class TestSearchFacts:
def test_search_returns_matching_fact(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "We use SQLite for local storage")
results = memory_store.search_facts(user["id"], "SQLite")
assert len(results) == 1
assert "SQLite" in results[0]["fact"]
def test_search_porter_stemming(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "FastMCP serialization rules")
# 'serialize' should match 'serialization' via Porter stemmer
results = memory_store.search_facts(user["id"], "serialize")
assert len(results) == 1
def test_search_no_results(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Prefers dark mode")
results = memory_store.search_facts(user["id"], "quantum")
assert results == []
def test_search_excludes_deprecated(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Old deprecated SQLite note")
memory_store.deprecate_fact(fid, user["id"])
results = memory_store.search_facts(user["id"], "SQLite")
assert results == []
def test_search_isolated_by_user(self, temp_db):
user_a = memory_store.get_or_create_user("search_a")
user_b = memory_store.get_or_create_user("search_b")
memory_store.store_fact(user_a["id"], "codebase", "User A secret fact")
assert memory_store.search_facts(user_b["id"], "secret") == []
def test_search_returns_category(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Prefers uv over pip")
results = memory_store.search_facts(user["id"], "uv")
assert results[0]["category"] == "preference"
def test_search_limit(self, temp_db, user):
for i in range(5):
memory_store.store_fact(user["id"], "codebase", f"SQLite fact number {i}")
results = memory_store.search_facts(user["id"], "SQLite", limit=3)
assert len(results) == 3
def test_search_multiword_and_match(self, temp_db, user):
# Regression: multi-word query must AND-match (both words anywhere),
# NOT phrase-match (words consecutive). The 2026-03-31 fix accidentally
# broke this by wrapping the whole query in one double-quoted phrase.
memory_store.store_fact(user["id"], "codebase",
"BigMind uses SQLite for persistent local storage")
# Both words present but NOT consecutive — phrase search would return nothing
results = memory_store.search_facts(user["id"], "SQLite persistent")
assert len(results) == 1, "Multi-word AND search must match even when words are not consecutive"
def test_search_multiword_partial_match_returns_nothing(self, temp_db, user):
# If only one of two required words is present, no match
memory_store.store_fact(user["id"], "codebase", "We use SQLite locally")
results = memory_store.search_facts(user["id"], "SQLite quantum")
assert results == []
def test_search_reserved_word_category(self, temp_db, user):
# Regression: FTS5 reserved words like 'rank', 'content', 'category'
# must not crash or return wrong results
memory_store.store_fact(user["id"], "codebase", "rank and content pipeline")
results = memory_store.search_facts(user["id"], "rank")
assert len(results) == 1
def test_search_reserved_word_content(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "the content table stores data")
results = memory_store.search_facts(user["id"], "content")
assert len(results) == 1
def test_search_three_word_query(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase",
"parallel sessions across IDEs share the same SQLite database")
results = memory_store.search_facts(user["id"], "parallel sessions SQLite")
assert len(results) == 1
class TestSearchChunksMultiword:
def test_chunk_multiword_and_match(self, temp_db, user):
# Same regression test for search_chunks
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "assistant",
"WAL mode allows parallel reads from multiple IDE sessions",
"architectural decision")
# Words present but not consecutive — phrase search would fail
results = memory_store.search_chunks(user["id"], "parallel IDE")
assert len(results) == 1, "Multi-word AND search on chunks must not be a phrase search"
def test_chunk_reserved_word_rank(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "assistant",
"bm25 rank scores the relevance of results", None)
results = memory_store.search_chunks(user["id"], "rank")
assert len(results) == 1
class TestUpgradeRequests:
def test_add_returns_id(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(
user["id"], sid, "Add FTS to facts", "Would speed up recall"
)
assert isinstance(rid, int) and rid > 0
def test_default_status_is_open(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["status"] == "open"
def test_default_priority_is_medium(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["priority"] == "medium"
def test_default_certainty(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["certainty"] == 0.7
def test_custom_priority_and_certainty(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(
user["id"], sid, "Critical feature", "Blocks work",
priority="high", certainty=0.95
)
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["priority"] == "high"
assert results[0]["certainty"] == 0.95
def test_list_filter_by_status(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature A", "Reason A")
memory_store.add_upgrade_request(user["id"], sid, "Feature B", "Reason B")
memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Done")
open_list = memory_store.list_upgrade_requests(user["id"], status="open")
resolved_list = memory_store.list_upgrade_requests(user["id"], status="resolved")
assert len(open_list) == 1 and open_list[0]["description"] == "Feature B"
assert len(resolved_list) == 1 and resolved_list[0]["description"] == "Feature A"
def test_resolve_resolved(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
result = memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Shipped in v4")
assert result is True
results = memory_store.list_upgrade_requests(user["id"], status="resolved")
assert results[0]["resolution"] == "Shipped in v4"
assert results[0]["resolved_at"] is not None
def test_resolve_rejected(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
memory_store.resolve_upgrade_request(rid, user["id"], "rejected", "Out of scope")
results = memory_store.list_upgrade_requests(user["id"], status="rejected")
assert results[0]["status"] == "rejected"
def test_resolve_invalid_status_raises(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
with pytest.raises(ValueError, match="Invalid status"):
memory_store.resolve_upgrade_request(rid, user["id"], "done")
def test_resolve_unknown_id_returns_false(self, temp_db, user):
assert memory_store.resolve_upgrade_request(99999, user["id"], "resolved") is False
def test_resolve_wrong_user_returns_false(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid = memory_store.create_session(user_a["id"])
rid = memory_store.add_upgrade_request(user_a["id"], sid, "Feature X", "Reason Y")
assert memory_store.resolve_upgrade_request(rid, user_b["id"], "resolved") is False
assert memory_store.list_upgrade_requests(user_a["id"], status="open")[0]["id"] == rid
def test_list_isolated_by_user(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid = memory_store.create_session(user_a["id"])
memory_store.add_upgrade_request(user_a["id"], sid, "Only A sees this", "Reason")
assert memory_store.list_upgrade_requests(user_b["id"]) == []
# ── Health Check ───────────────────────────────────────────────────────────────
class TestHealthCheck:
def test_returns_expected_keys(self, temp_db, user):
report = memory_store.health_check(user["id"])
for key in (
"stale_facts", "sessions_without_summary", "open_sessions",
"chunk_count", "fts_row_count", "fts_in_sync",
"low_confidence_facts", "stale_threshold_days",
):
assert key in report
def test_empty_db_no_issues(self, temp_db, user):
report = memory_store.health_check(user["id"])
assert report["stale_facts"] == []
assert report["sessions_without_summary"] == 0
assert report["open_sessions"] == []
assert report["fts_in_sync"] is True
def test_detects_stale_facts(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Old deployment note")
old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
with db() as conn:
conn.execute("UPDATE facts SET updated_at=? WHERE id=?", (old_date, fid))
report = memory_store.health_check(user["id"], stale_days=30)
assert len(report["stale_facts"]) == 1
assert report["stale_facts"][0]["id"] == fid
def test_fresh_facts_not_flagged(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Fresh fact today")
report = memory_store.health_check(user["id"], stale_days=30)
assert report["stale_facts"] == []
def test_detects_sessions_without_summary(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Closed without summary")
report = memory_store.health_check(user["id"])
assert report["sessions_without_summary"] == 1
def test_sessions_with_summary_not_flagged(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Closed with summary")
memory_store.save_session_summary(sid, "Full narrative here.")
report = memory_store.health_check(user["id"])
assert report["sessions_without_summary"] == 0
def test_detects_open_sessions(self, temp_db, user):
memory_store.create_session(user["id"])
report = memory_store.health_check(user["id"])
assert len(report["open_sessions"]) == 1
def test_detects_low_confidence_facts(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "Uncertain thing", confidence=0.5)
report = memory_store.health_check(user["id"])
assert len(report["low_confidence_facts"]) == 1
assert report["low_confidence_facts"][0]["confidence"] == 0.5
def test_high_confidence_facts_not_flagged(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Certain thing", confidence=1.0)
report = memory_store.health_check(user["id"])
assert report["low_confidence_facts"] == []
def test_fts_in_sync_after_append(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "user", "FTS test chunk")
report = memory_store.health_check(user["id"])
assert report["fts_in_sync"] is True
assert report["chunk_count"] == 1
assert report["fts_row_count"] == 1
def test_stale_threshold_stored_in_report(self, temp_db, user):
report = memory_store.health_check(user["id"], stale_days=45)
assert report["stale_threshold_days"] == 45
class TestExportMemory:
def test_creates_file(self, temp_db, user, tmp_path):
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
assert Path(out).exists()
def test_json_structure(self, temp_db, user, tmp_path):
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
for key in (
"export_date", "bigmind_schema_version", "user", "identity_profile",
"facts", "sessions", "conversation_chunks", "hypotheses",
"upgrade_requests", "token_saves", "people", "stats",
):
assert key in data
def test_facts_included(self, temp_db, user, tmp_path):
memory_store.store_fact(user["id"], "preference", "Exported preference fact")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
assert any(f["fact"] == "Exported preference fact" for f in data["facts"])
def test_sessions_included_with_tier2(self, temp_db, user, tmp_path):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Exported session")
memory_store.save_session_summary(sid, "Full story for export test.")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
exported = next(s for s in data["sessions"] if s["id"] == sid)
assert exported["one_liner"] == "Exported session"
assert exported["tier2_summary"]["summary"] == "Full story for export test."
def test_session_without_summary_has_null_tier2(self, temp_db, user, tmp_path):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "No narrative session")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
exported = next(s for s in data["sessions"] if s["id"] == sid)
assert exported["tier2_summary"] is None
def test_chunks_included(self, temp_db, user, tmp_path):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "assistant", "Exported chunk content")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
assert any("Exported chunk content" in c["content"] for c in data["conversation_chunks"])
def test_stats_accurate(self, temp_db, user, tmp_path):
memory_store.store_fact(user["id"], "pref", "Fact A")
memory_store.store_fact(user["id"], "pref", "Fact B")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
assert data["stats"]["facts_count"] == 2
def test_returns_result_dict(self, temp_db, user, tmp_path):
out = str(tmp_path / "export.json")
result = memory_store.export_memory(user["id"], out)
assert result["output_path"] == out
for key in ("facts_count", "sessions_count", "chunks_count", "file_size_kb"):
assert key in result
def test_default_path_in_home_dir(self, temp_db, user):
result = memory_store.export_memory(user["id"])
path = Path(result["output_path"])
try:
assert path.exists()
assert path.parent == Path.home()
finally:
path.unlink(missing_ok=True) # cleanup
# ── Auto-close / Orphaned Sessions ────────────────────────────────────────────
class TestCloseOrphanedSessions:
"""Tests for close_orphaned_sessions — the manual session cleanup tool."""
def test_closes_all_open_except_current(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s1 = memory_store.create_session(user["id"])
s2 = memory_store.create_session(user["id"])
s3 = memory_store.create_session(user["id"]) # "current"
closed = close_orphaned_sessions(user["id"], keep_session_id=s3)
assert set(closed) == {s1, s2}
assert s3 not in closed
def test_closed_sessions_have_ended_at(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s1 = memory_store.create_session(user["id"])
s2 = memory_store.create_session(user["id"]) # keep
close_orphaned_sessions(user["id"], keep_session_id=s2)
with db() as conn:
row = conn.execute(
"SELECT ended_at, one_liner FROM sessions WHERE id=?", (s1,)
).fetchone()
assert row["ended_at"] is not None
assert "orphaned" in row["one_liner"]
def test_keep_session_remains_open(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
memory_store.create_session(user["id"])
s_current = memory_store.create_session(user["id"])
close_orphaned_sessions(user["id"], keep_session_id=s_current)
with db() as conn:
row = conn.execute(
"SELECT ended_at FROM sessions WHERE id=?", (s_current,)
).fetchone()
assert row["ended_at"] is None
def test_returns_empty_when_no_orphans(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s_current = memory_store.create_session(user["id"])
assert close_orphaned_sessions(user["id"], keep_session_id=s_current) == []
def test_does_not_touch_already_closed_sessions(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s_old = memory_store.create_session(user["id"])
memory_store.close_session(s_old, "Already closed")
s_current = memory_store.create_session(user["id"])
closed = close_orphaned_sessions(user["id"], keep_session_id=s_current)
assert s_old not in closed
def test_isolated_by_user(self, temp_db):
from bigmind.auto_close import close_orphaned_sessions
u1 = memory_store.get_or_create_user("cleanup_user_a")
u2 = memory_store.get_or_create_user("cleanup_user_b")
memory_store.create_session(u1["id"])
s_u1_current = memory_store.create_session(u1["id"])
s_u2_active = memory_store.create_session(u2["id"])
close_orphaned_sessions(u1["id"], keep_session_id=s_u1_current)
still_open = memory_store.get_open_sessions(u2["id"])
assert any(s["id"] == s_u2_active for s in still_open)
# ── Restart Server ─────────────────────────────────────────────────────────────
class TestRestartServer:
"""Tests for restart_server_in_place — os.execv-based in-place restart."""
def test_calls_execv_with_correct_args(self, monkeypatch):
"""restart_server_in_place must call os.execv(sys.executable, [sys.executable] + sys.argv)."""
import os
import sys
import time
from bigmind.auto_close import restart_server_in_place
execv_calls = []
monkeypatch.setattr(os, "execv", lambda path, args: execv_calls.append((path, args)))
monkeypatch.setattr(time, "sleep", lambda _: None) # skip the 500ms delay
restart_server_in_place()
assert len(execv_calls) == 1
path, args = execv_calls[0]
assert path == sys.executable
assert args[0] == sys.executable
assert args[1:] == sys.argv
def test_sleep_called_before_execv(self, monkeypatch):
"""sleep must be called before execv so the MCP response is delivered first."""
import os
import time
from bigmind.auto_close import restart_server_in_place
call_order = []
monkeypatch.setattr(time, "sleep", lambda _: call_order.append("sleep"))
monkeypatch.setattr(os, "execv", lambda *_: call_order.append("execv"))
restart_server_in_place()
assert call_order == ["sleep", "execv"]
def test_sleep_duration_is_half_second(self, monkeypatch):
"""The delay must be 0.5s — long enough for the MCP response to be sent."""
import os
import time
from bigmind.auto_close import restart_server_in_place
sleep_durations = []
monkeypatch.setattr(time, "sleep", lambda d: sleep_durations.append(d))
monkeypatch.setattr(os, "execv", lambda *_: None)
restart_server_in_place()
assert sleep_durations == [0.5]