42ffc85f0b
BUG-1: fix test_server_tools.py assert "ALWAYS" → "Always" (case mismatch)
BUG-2: export_memory() now includes hypotheses, upgrade_requests, token_saves,
people tables; renamed bigmind_version → bigmind_schema_version (int)
BUG-3: auto_close.py replaced CURRENT_TIMESTAMP (SQLite) with Python
datetime.now(timezone.utc).isoformat() for consistent UTC timestamps
PERF-1: context_builder.py caps get_facts() at _MAX_CONTEXT_FACTS=50 with
overflow hint to prevent unbounded context growth
All 297 tests passing. Upgrade requests #6-9 resolved.
Health report: plans/BIGMIND_HEALTH_REPORT_2026-04-04.md
758 lines
36 KiB
Python
758 lines
36 KiB
Python
"""Tests for memory_store CRUD operations."""
|
|
import json
|
|
import pytest
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
|
|
from bigmind import memory_store
|
|
from bigmind.db import db
|
|
|
|
|
|
@pytest.fixture
|
|
def user():
|
|
return memory_store.get_or_create_user("testuser", "Test User")
|
|
|
|
|
|
class TestUsers:
|
|
def test_create_user(self, temp_db):
|
|
u = memory_store.get_or_create_user("alice")
|
|
assert u["username"] == "alice"
|
|
assert u["id"]
|
|
|
|
def test_get_existing_user(self, temp_db):
|
|
u1 = memory_store.get_or_create_user("bob")
|
|
u2 = memory_store.get_or_create_user("bob")
|
|
assert u1["id"] == u2["id"]
|
|
|
|
def test_get_current_username_from_env(self, monkeypatch):
|
|
monkeypatch.setenv("BIGMIND_USER", "envuser")
|
|
assert memory_store.get_current_username() == "envuser"
|
|
|
|
|
|
class TestIdentityProfile:
|
|
def test_upsert_creates_profile(self, temp_db, user):
|
|
profile = memory_store.upsert_identity_profile(
|
|
user["id"], role="Engineer", preferences="Python first"
|
|
)
|
|
assert profile["role"] == "Engineer"
|
|
assert profile["preferences"] == "Python first"
|
|
|
|
def test_upsert_partial_update_preserves_other_fields(self, temp_db, user):
|
|
memory_store.upsert_identity_profile(
|
|
user["id"], role="Eng", preferences="Python", pinned_facts="- fact 1"
|
|
)
|
|
# Only update role — preferences and pinned_facts should be preserved
|
|
memory_store.upsert_identity_profile(user["id"], role="Senior Eng")
|
|
profile = memory_store.get_identity_profile(user["id"])
|
|
assert profile["role"] == "Senior Eng"
|
|
assert profile["preferences"] == "Python"
|
|
assert "fact 1" in profile["pinned_facts"]
|
|
|
|
def test_get_missing_profile_returns_none(self, temp_db):
|
|
assert memory_store.get_identity_profile("nonexistent-id") is None
|
|
|
|
|
|
class TestSessions:
|
|
def test_create_session_returns_id(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
assert sid and len(sid) == 36 # UUID format
|
|
|
|
def test_close_session(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.close_session(sid, "Built a thing", topics="mcp", outcome="Done")
|
|
sessions = memory_store.get_recent_sessions(user["id"])
|
|
assert len(sessions) == 1
|
|
assert sessions[0]["one_liner"] == "Built a thing"
|
|
assert sessions[0]["topics"] == "mcp"
|
|
|
|
def test_one_liner_truncated_at_120_chars(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
long_title = "x" * 200
|
|
memory_store.close_session(sid, long_title)
|
|
sessions = memory_store.get_recent_sessions(user["id"])
|
|
assert len(sessions[0]["one_liner"]) == 120
|
|
|
|
def test_open_session_not_in_recent(self, temp_db, user):
|
|
memory_store.create_session(user["id"])
|
|
# Open sessions (no ended_at) must NOT appear in the recent list
|
|
assert memory_store.get_recent_sessions(user["id"]) == []
|
|
|
|
def test_save_session_summary(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.close_session(sid, "Headline")
|
|
memory_store.save_session_summary(sid, "Detailed summary", key_facts="- fact")
|
|
detail = memory_store.get_session_detail(sid)
|
|
assert detail["summary"] == "Detailed summary"
|
|
assert detail["key_facts"] == "- fact"
|
|
# has_tier2 flag must be set
|
|
sessions = memory_store.get_recent_sessions(user["id"])
|
|
assert sessions[0]["has_tier2"] == 1
|
|
|
|
def test_get_open_sessions(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
open_s = memory_store.get_open_sessions(user["id"])
|
|
assert any(s["id"] == sid for s in open_s)
|
|
|
|
|
|
class TestChunks:
|
|
def test_append_and_search_chunks(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.append_chunk(
|
|
sid, user["id"], "assistant",
|
|
"We decided to use SQLite for the database",
|
|
"architectural decision"
|
|
)
|
|
results = memory_store.search_chunks(user["id"], "SQLite")
|
|
assert len(results) == 1
|
|
assert "SQLite" in results[0]["content"]
|
|
|
|
def test_chunk_seq_increments(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
id1 = memory_store.append_chunk(sid, user["id"], "user", "first")
|
|
id2 = memory_store.append_chunk(sid, user["id"], "assistant", "second")
|
|
assert id2 > id1
|
|
|
|
def test_search_isolated_by_user(self, temp_db):
|
|
u1 = memory_store.get_or_create_user("user_a")
|
|
u2 = memory_store.get_or_create_user("user_b")
|
|
sid1 = memory_store.create_session(u1["id"])
|
|
memory_store.append_chunk(sid1, u1["id"], "user", "secret data")
|
|
# u2 should not see u1's chunks
|
|
assert memory_store.search_chunks(u2["id"], "secret") == []
|
|
|
|
|
|
class TestFacts:
|
|
def test_store_and_retrieve_fact(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "preference", "Prefers dark mode")
|
|
facts = memory_store.get_facts(user["id"])
|
|
assert any(f["id"] == fid for f in facts)
|
|
|
|
def test_filter_by_category(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "preference", "Python first")
|
|
memory_store.store_fact(user["id"], "decision", "Use SQLite")
|
|
prefs = memory_store.get_facts(user["id"], category="preference")
|
|
assert all(f["category"] == "preference" for f in prefs)
|
|
|
|
|
|
class TestFactDeprecation:
|
|
def test_deprecate_returns_true_on_success(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "codebase", "Old stack note")
|
|
result = memory_store.deprecate_fact(fid, user["id"], "No longer applicable")
|
|
assert result is True
|
|
|
|
def test_deprecate_hides_fact_from_get_facts(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "codebase", "Deprecated technology")
|
|
memory_store.deprecate_fact(fid, user["id"])
|
|
facts = memory_store.get_facts(user["id"])
|
|
assert not any(f["id"] == fid for f in facts)
|
|
|
|
def test_deprecated_fact_visible_with_include_deprecated(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "codebase", "Soft-deleted fact")
|
|
memory_store.deprecate_fact(fid, user["id"], "outdated")
|
|
all_facts = memory_store.get_facts(user["id"], include_deprecated=True)
|
|
assert any(f["id"] == fid for f in all_facts)
|
|
|
|
def test_deprecation_reason_stored(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "decision", "Use Gradle")
|
|
memory_store.deprecate_fact(fid, user["id"], "Switched to Maven")
|
|
all_facts = memory_store.get_facts(user["id"], include_deprecated=True)
|
|
deprecated = next(f for f in all_facts if f["id"] == fid)
|
|
assert deprecated["deprecation_reason"] == "Switched to Maven"
|
|
assert deprecated["deprecated"] == 1
|
|
|
|
def test_deprecate_no_reason_still_works(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "preference", "Some preference")
|
|
result = memory_store.deprecate_fact(fid, user["id"])
|
|
assert result is True
|
|
facts = memory_store.get_facts(user["id"])
|
|
assert not any(f["id"] == fid for f in facts)
|
|
|
|
def test_deprecate_unknown_id_returns_false(self, temp_db, user):
|
|
result = memory_store.deprecate_fact(99999, user["id"])
|
|
assert result is False
|
|
|
|
def test_deprecate_wrong_user_returns_false(self, temp_db):
|
|
user_a = memory_store.get_or_create_user("user_a")
|
|
user_b = memory_store.get_or_create_user("user_b")
|
|
fid = memory_store.store_fact(user_a["id"], "codebase", "User A's fact")
|
|
result = memory_store.deprecate_fact(fid, user_b["id"], "Should not work")
|
|
assert result is False
|
|
# Fact must still be visible for user_a
|
|
facts = memory_store.get_facts(user_a["id"])
|
|
assert any(f["id"] == fid for f in facts)
|
|
|
|
def test_non_deprecated_facts_still_returned_by_default(self, temp_db, user):
|
|
fid_keep = memory_store.store_fact(user["id"], "preference", "Keep this one")
|
|
fid_drop = memory_store.store_fact(user["id"], "preference", "Drop this one")
|
|
memory_store.deprecate_fact(fid_drop, user["id"])
|
|
facts = memory_store.get_facts(user["id"])
|
|
ids = [f["id"] for f in facts]
|
|
assert fid_keep in ids
|
|
assert fid_drop not in ids
|
|
|
|
|
|
class TestStats:
|
|
def test_stats_returns_expected_keys(self, temp_db, user):
|
|
stats = memory_store.get_stats(user["id"])
|
|
for key in ("sessions", "facts", "chunks", "db_size_kb", "db_path"):
|
|
assert key in stats
|
|
|
|
|
|
class TestHypotheses:
|
|
def test_add_hypothesis_returns_id(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "I believe X because Y")
|
|
assert isinstance(hid, int)
|
|
assert hid > 0
|
|
|
|
def test_add_hypothesis_default_confidence(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "Default confidence belief")
|
|
results = memory_store.list_hypotheses(user["id"])
|
|
assert results[0]["confidence"] == 0.7
|
|
|
|
def test_add_hypothesis_custom_confidence(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.add_hypothesis(user["id"], sid, "Strong belief", confidence=0.95)
|
|
results = memory_store.list_hypotheses(user["id"])
|
|
assert results[0]["confidence"] == 0.95
|
|
|
|
def test_new_hypothesis_status_is_open(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.add_hypothesis(user["id"], sid, "Fresh thought")
|
|
results = memory_store.list_hypotheses(user["id"])
|
|
assert results[0]["status"] == "open"
|
|
|
|
def test_list_hypotheses_returns_all(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.add_hypothesis(user["id"], sid, "Thought one")
|
|
memory_store.add_hypothesis(user["id"], sid, "Thought two")
|
|
results = memory_store.list_hypotheses(user["id"])
|
|
assert len(results) == 2
|
|
|
|
def test_list_hypotheses_filter_by_status(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "Will be confirmed")
|
|
memory_store.add_hypothesis(user["id"], sid, "Stays open")
|
|
memory_store.resolve_hypothesis(hid, user["id"], "confirmed", "It was true")
|
|
open_list = memory_store.list_hypotheses(user["id"], status="open")
|
|
confirmed_list = memory_store.list_hypotheses(user["id"], status="confirmed")
|
|
assert len(open_list) == 1
|
|
assert open_list[0]["hypothesis"] == "Stays open"
|
|
assert len(confirmed_list) == 1
|
|
assert confirmed_list[0]["hypothesis"] == "Will be confirmed"
|
|
|
|
def test_resolve_confirmed(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "Bug is in serializer")
|
|
result = memory_store.resolve_hypothesis(
|
|
hid, user["id"], "confirmed", "Confirmed — the serializer had a null check missing"
|
|
)
|
|
assert result is True
|
|
resolved = memory_store.list_hypotheses(user["id"], status="confirmed")
|
|
assert len(resolved) == 1
|
|
assert resolved[0]["resolution"] == "Confirmed — the serializer had a null check missing"
|
|
assert resolved[0]["resolved_at"] is not None
|
|
|
|
def test_resolve_refuted(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "Problem is in the network")
|
|
memory_store.resolve_hypothesis(hid, user["id"], "refuted", "Was actually a race condition")
|
|
results = memory_store.list_hypotheses(user["id"], status="refuted")
|
|
assert results[0]["status"] == "refuted"
|
|
|
|
def test_resolve_abandoned(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "Might be a cache issue")
|
|
memory_store.resolve_hypothesis(hid, user["id"], "abandoned")
|
|
results = memory_store.list_hypotheses(user["id"], status="abandoned")
|
|
assert len(results) == 1
|
|
|
|
def test_resolve_invalid_status_raises(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "Some thought")
|
|
with pytest.raises(ValueError, match="Invalid status"):
|
|
memory_store.resolve_hypothesis(hid, user["id"], "wrong_status")
|
|
|
|
def test_resolve_unknown_id_returns_false(self, temp_db, user):
|
|
result = memory_store.resolve_hypothesis(99999, user["id"], "confirmed")
|
|
assert result is False
|
|
|
|
def test_resolve_wrong_user_returns_false(self, temp_db):
|
|
user_a = memory_store.get_or_create_user("user_a")
|
|
user_b = memory_store.get_or_create_user("user_b")
|
|
sid = memory_store.create_session(user_a["id"])
|
|
hid = memory_store.add_hypothesis(user_a["id"], sid, "User A's private thought")
|
|
result = memory_store.resolve_hypothesis(hid, user_b["id"], "confirmed")
|
|
assert result is False
|
|
# Hypothesis must still be open for user_a
|
|
results = memory_store.list_hypotheses(user_a["id"], status="open")
|
|
assert len(results) == 1
|
|
|
|
def test_list_isolated_by_user(self, temp_db):
|
|
user_a = memory_store.get_or_create_user("user_a")
|
|
user_b = memory_store.get_or_create_user("user_b")
|
|
sid_a = memory_store.create_session(user_a["id"])
|
|
memory_store.add_hypothesis(user_a["id"], sid_a, "User A only sees this")
|
|
assert memory_store.list_hypotheses(user_b["id"]) == []
|
|
|
|
def test_resolve_no_resolution_text_is_allowed(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
hid = memory_store.add_hypothesis(user["id"], sid, "Quick thought")
|
|
result = memory_store.resolve_hypothesis(hid, user["id"], "abandoned")
|
|
assert result is True
|
|
|
|
|
|
class TestSearchFacts:
|
|
def test_search_returns_matching_fact(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "codebase", "We use SQLite for local storage")
|
|
results = memory_store.search_facts(user["id"], "SQLite")
|
|
assert len(results) == 1
|
|
assert "SQLite" in results[0]["fact"]
|
|
|
|
def test_search_porter_stemming(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "codebase", "FastMCP serialization rules")
|
|
# 'serialize' should match 'serialization' via Porter stemmer
|
|
results = memory_store.search_facts(user["id"], "serialize")
|
|
assert len(results) == 1
|
|
|
|
def test_search_no_results(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "preference", "Prefers dark mode")
|
|
results = memory_store.search_facts(user["id"], "quantum")
|
|
assert results == []
|
|
|
|
def test_search_excludes_deprecated(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "codebase", "Old deprecated SQLite note")
|
|
memory_store.deprecate_fact(fid, user["id"])
|
|
results = memory_store.search_facts(user["id"], "SQLite")
|
|
assert results == []
|
|
|
|
def test_search_isolated_by_user(self, temp_db):
|
|
user_a = memory_store.get_or_create_user("search_a")
|
|
user_b = memory_store.get_or_create_user("search_b")
|
|
memory_store.store_fact(user_a["id"], "codebase", "User A secret fact")
|
|
assert memory_store.search_facts(user_b["id"], "secret") == []
|
|
|
|
def test_search_returns_category(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "preference", "Prefers uv over pip")
|
|
results = memory_store.search_facts(user["id"], "uv")
|
|
assert results[0]["category"] == "preference"
|
|
|
|
def test_search_limit(self, temp_db, user):
|
|
for i in range(5):
|
|
memory_store.store_fact(user["id"], "codebase", f"SQLite fact number {i}")
|
|
results = memory_store.search_facts(user["id"], "SQLite", limit=3)
|
|
assert len(results) == 3
|
|
|
|
def test_search_multiword_and_match(self, temp_db, user):
|
|
# Regression: multi-word query must AND-match (both words anywhere),
|
|
# NOT phrase-match (words consecutive). The 2026-03-31 fix accidentally
|
|
# broke this by wrapping the whole query in one double-quoted phrase.
|
|
memory_store.store_fact(user["id"], "codebase",
|
|
"BigMind uses SQLite for persistent local storage")
|
|
# Both words present but NOT consecutive — phrase search would return nothing
|
|
results = memory_store.search_facts(user["id"], "SQLite persistent")
|
|
assert len(results) == 1, "Multi-word AND search must match even when words are not consecutive"
|
|
|
|
def test_search_multiword_partial_match_returns_nothing(self, temp_db, user):
|
|
# If only one of two required words is present, no match
|
|
memory_store.store_fact(user["id"], "codebase", "We use SQLite locally")
|
|
results = memory_store.search_facts(user["id"], "SQLite quantum")
|
|
assert results == []
|
|
|
|
def test_search_reserved_word_category(self, temp_db, user):
|
|
# Regression: FTS5 reserved words like 'rank', 'content', 'category'
|
|
# must not crash or return wrong results
|
|
memory_store.store_fact(user["id"], "codebase", "rank and content pipeline")
|
|
results = memory_store.search_facts(user["id"], "rank")
|
|
assert len(results) == 1
|
|
|
|
def test_search_reserved_word_content(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "codebase", "the content table stores data")
|
|
results = memory_store.search_facts(user["id"], "content")
|
|
assert len(results) == 1
|
|
|
|
def test_search_three_word_query(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "codebase",
|
|
"parallel sessions across IDEs share the same SQLite database")
|
|
results = memory_store.search_facts(user["id"], "parallel sessions SQLite")
|
|
assert len(results) == 1
|
|
|
|
|
|
class TestSearchChunksMultiword:
|
|
def test_chunk_multiword_and_match(self, temp_db, user):
|
|
# Same regression test for search_chunks
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.append_chunk(sid, user["id"], "assistant",
|
|
"WAL mode allows parallel reads from multiple IDE sessions",
|
|
"architectural decision")
|
|
# Words present but not consecutive — phrase search would fail
|
|
results = memory_store.search_chunks(user["id"], "parallel IDE")
|
|
assert len(results) == 1, "Multi-word AND search on chunks must not be a phrase search"
|
|
|
|
def test_chunk_reserved_word_rank(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.append_chunk(sid, user["id"], "assistant",
|
|
"bm25 rank scores the relevance of results", None)
|
|
results = memory_store.search_chunks(user["id"], "rank")
|
|
assert len(results) == 1
|
|
|
|
|
|
class TestUpgradeRequests:
|
|
def test_add_returns_id(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
rid = memory_store.add_upgrade_request(
|
|
user["id"], sid, "Add FTS to facts", "Would speed up recall"
|
|
)
|
|
assert isinstance(rid, int) and rid > 0
|
|
|
|
def test_default_status_is_open(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
|
|
results = memory_store.list_upgrade_requests(user["id"])
|
|
assert results[0]["status"] == "open"
|
|
|
|
def test_default_priority_is_medium(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
|
|
results = memory_store.list_upgrade_requests(user["id"])
|
|
assert results[0]["priority"] == "medium"
|
|
|
|
def test_default_certainty(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
|
|
results = memory_store.list_upgrade_requests(user["id"])
|
|
assert results[0]["certainty"] == 0.7
|
|
|
|
def test_custom_priority_and_certainty(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.add_upgrade_request(
|
|
user["id"], sid, "Critical feature", "Blocks work",
|
|
priority="high", certainty=0.95
|
|
)
|
|
results = memory_store.list_upgrade_requests(user["id"])
|
|
assert results[0]["priority"] == "high"
|
|
assert results[0]["certainty"] == 0.95
|
|
|
|
def test_list_filter_by_status(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature A", "Reason A")
|
|
memory_store.add_upgrade_request(user["id"], sid, "Feature B", "Reason B")
|
|
memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Done")
|
|
open_list = memory_store.list_upgrade_requests(user["id"], status="open")
|
|
resolved_list = memory_store.list_upgrade_requests(user["id"], status="resolved")
|
|
assert len(open_list) == 1 and open_list[0]["description"] == "Feature B"
|
|
assert len(resolved_list) == 1 and resolved_list[0]["description"] == "Feature A"
|
|
|
|
def test_resolve_resolved(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
|
|
result = memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Shipped in v4")
|
|
assert result is True
|
|
results = memory_store.list_upgrade_requests(user["id"], status="resolved")
|
|
assert results[0]["resolution"] == "Shipped in v4"
|
|
assert results[0]["resolved_at"] is not None
|
|
|
|
def test_resolve_rejected(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
|
|
memory_store.resolve_upgrade_request(rid, user["id"], "rejected", "Out of scope")
|
|
results = memory_store.list_upgrade_requests(user["id"], status="rejected")
|
|
assert results[0]["status"] == "rejected"
|
|
|
|
def test_resolve_invalid_status_raises(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
|
|
with pytest.raises(ValueError, match="Invalid status"):
|
|
memory_store.resolve_upgrade_request(rid, user["id"], "done")
|
|
|
|
def test_resolve_unknown_id_returns_false(self, temp_db, user):
|
|
assert memory_store.resolve_upgrade_request(99999, user["id"], "resolved") is False
|
|
|
|
def test_resolve_wrong_user_returns_false(self, temp_db):
|
|
user_a = memory_store.get_or_create_user("user_a")
|
|
user_b = memory_store.get_or_create_user("user_b")
|
|
sid = memory_store.create_session(user_a["id"])
|
|
rid = memory_store.add_upgrade_request(user_a["id"], sid, "Feature X", "Reason Y")
|
|
assert memory_store.resolve_upgrade_request(rid, user_b["id"], "resolved") is False
|
|
assert memory_store.list_upgrade_requests(user_a["id"], status="open")[0]["id"] == rid
|
|
|
|
def test_list_isolated_by_user(self, temp_db):
|
|
user_a = memory_store.get_or_create_user("user_a")
|
|
user_b = memory_store.get_or_create_user("user_b")
|
|
sid = memory_store.create_session(user_a["id"])
|
|
memory_store.add_upgrade_request(user_a["id"], sid, "Only A sees this", "Reason")
|
|
assert memory_store.list_upgrade_requests(user_b["id"]) == []
|
|
|
|
|
|
# ── Health Check ───────────────────────────────────────────────────────────────
|
|
|
|
class TestHealthCheck:
|
|
def test_returns_expected_keys(self, temp_db, user):
|
|
report = memory_store.health_check(user["id"])
|
|
for key in (
|
|
"stale_facts", "sessions_without_summary", "open_sessions",
|
|
"chunk_count", "fts_row_count", "fts_in_sync",
|
|
"low_confidence_facts", "stale_threshold_days",
|
|
):
|
|
assert key in report
|
|
|
|
def test_empty_db_no_issues(self, temp_db, user):
|
|
report = memory_store.health_check(user["id"])
|
|
assert report["stale_facts"] == []
|
|
assert report["sessions_without_summary"] == 0
|
|
assert report["open_sessions"] == []
|
|
assert report["fts_in_sync"] is True
|
|
|
|
def test_detects_stale_facts(self, temp_db, user):
|
|
fid = memory_store.store_fact(user["id"], "codebase", "Old deployment note")
|
|
old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
|
|
with db() as conn:
|
|
conn.execute("UPDATE facts SET updated_at=? WHERE id=?", (old_date, fid))
|
|
report = memory_store.health_check(user["id"], stale_days=30)
|
|
assert len(report["stale_facts"]) == 1
|
|
assert report["stale_facts"][0]["id"] == fid
|
|
|
|
def test_fresh_facts_not_flagged(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "preference", "Fresh fact today")
|
|
report = memory_store.health_check(user["id"], stale_days=30)
|
|
assert report["stale_facts"] == []
|
|
|
|
def test_detects_sessions_without_summary(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.close_session(sid, "Closed without summary")
|
|
report = memory_store.health_check(user["id"])
|
|
assert report["sessions_without_summary"] == 1
|
|
|
|
def test_sessions_with_summary_not_flagged(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.close_session(sid, "Closed with summary")
|
|
memory_store.save_session_summary(sid, "Full narrative here.")
|
|
report = memory_store.health_check(user["id"])
|
|
assert report["sessions_without_summary"] == 0
|
|
|
|
def test_detects_open_sessions(self, temp_db, user):
|
|
memory_store.create_session(user["id"])
|
|
report = memory_store.health_check(user["id"])
|
|
assert len(report["open_sessions"]) == 1
|
|
|
|
def test_detects_low_confidence_facts(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "codebase", "Uncertain thing", confidence=0.5)
|
|
report = memory_store.health_check(user["id"])
|
|
assert len(report["low_confidence_facts"]) == 1
|
|
assert report["low_confidence_facts"][0]["confidence"] == 0.5
|
|
|
|
def test_high_confidence_facts_not_flagged(self, temp_db, user):
|
|
memory_store.store_fact(user["id"], "preference", "Certain thing", confidence=1.0)
|
|
report = memory_store.health_check(user["id"])
|
|
assert report["low_confidence_facts"] == []
|
|
|
|
def test_fts_in_sync_after_append(self, temp_db, user):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.append_chunk(sid, user["id"], "user", "FTS test chunk")
|
|
report = memory_store.health_check(user["id"])
|
|
assert report["fts_in_sync"] is True
|
|
assert report["chunk_count"] == 1
|
|
assert report["fts_row_count"] == 1
|
|
|
|
def test_stale_threshold_stored_in_report(self, temp_db, user):
|
|
report = memory_store.health_check(user["id"], stale_days=45)
|
|
assert report["stale_threshold_days"] == 45
|
|
|
|
|
|
class TestExportMemory:
|
|
def test_creates_file(self, temp_db, user, tmp_path):
|
|
out = str(tmp_path / "export.json")
|
|
memory_store.export_memory(user["id"], out)
|
|
assert Path(out).exists()
|
|
|
|
def test_json_structure(self, temp_db, user, tmp_path):
|
|
out = str(tmp_path / "export.json")
|
|
memory_store.export_memory(user["id"], out)
|
|
data = json.loads(Path(out).read_text())
|
|
for key in (
|
|
"export_date", "bigmind_schema_version", "user", "identity_profile",
|
|
"facts", "sessions", "conversation_chunks", "hypotheses",
|
|
"upgrade_requests", "token_saves", "people", "stats",
|
|
):
|
|
assert key in data
|
|
|
|
def test_facts_included(self, temp_db, user, tmp_path):
|
|
memory_store.store_fact(user["id"], "preference", "Exported preference fact")
|
|
out = str(tmp_path / "export.json")
|
|
memory_store.export_memory(user["id"], out)
|
|
data = json.loads(Path(out).read_text())
|
|
assert any(f["fact"] == "Exported preference fact" for f in data["facts"])
|
|
|
|
def test_sessions_included_with_tier2(self, temp_db, user, tmp_path):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.close_session(sid, "Exported session")
|
|
memory_store.save_session_summary(sid, "Full story for export test.")
|
|
out = str(tmp_path / "export.json")
|
|
memory_store.export_memory(user["id"], out)
|
|
data = json.loads(Path(out).read_text())
|
|
exported = next(s for s in data["sessions"] if s["id"] == sid)
|
|
assert exported["one_liner"] == "Exported session"
|
|
assert exported["tier2_summary"]["summary"] == "Full story for export test."
|
|
|
|
def test_session_without_summary_has_null_tier2(self, temp_db, user, tmp_path):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.close_session(sid, "No narrative session")
|
|
out = str(tmp_path / "export.json")
|
|
memory_store.export_memory(user["id"], out)
|
|
data = json.loads(Path(out).read_text())
|
|
exported = next(s for s in data["sessions"] if s["id"] == sid)
|
|
assert exported["tier2_summary"] is None
|
|
|
|
def test_chunks_included(self, temp_db, user, tmp_path):
|
|
sid = memory_store.create_session(user["id"])
|
|
memory_store.append_chunk(sid, user["id"], "assistant", "Exported chunk content")
|
|
out = str(tmp_path / "export.json")
|
|
memory_store.export_memory(user["id"], out)
|
|
data = json.loads(Path(out).read_text())
|
|
assert any("Exported chunk content" in c["content"] for c in data["conversation_chunks"])
|
|
|
|
def test_stats_accurate(self, temp_db, user, tmp_path):
|
|
memory_store.store_fact(user["id"], "pref", "Fact A")
|
|
memory_store.store_fact(user["id"], "pref", "Fact B")
|
|
out = str(tmp_path / "export.json")
|
|
memory_store.export_memory(user["id"], out)
|
|
data = json.loads(Path(out).read_text())
|
|
assert data["stats"]["facts_count"] == 2
|
|
|
|
def test_returns_result_dict(self, temp_db, user, tmp_path):
|
|
out = str(tmp_path / "export.json")
|
|
result = memory_store.export_memory(user["id"], out)
|
|
assert result["output_path"] == out
|
|
for key in ("facts_count", "sessions_count", "chunks_count", "file_size_kb"):
|
|
assert key in result
|
|
|
|
def test_default_path_in_home_dir(self, temp_db, user):
|
|
result = memory_store.export_memory(user["id"])
|
|
path = Path(result["output_path"])
|
|
try:
|
|
assert path.exists()
|
|
assert path.parent == Path.home()
|
|
finally:
|
|
path.unlink(missing_ok=True) # cleanup
|
|
|
|
|
|
# ── Auto-close / Orphaned Sessions ────────────────────────────────────────────
|
|
|
|
class TestCloseOrphanedSessions:
|
|
"""Tests for close_orphaned_sessions — the manual session cleanup tool."""
|
|
|
|
def test_closes_all_open_except_current(self, temp_db, user):
|
|
from bigmind.auto_close import close_orphaned_sessions
|
|
s1 = memory_store.create_session(user["id"])
|
|
s2 = memory_store.create_session(user["id"])
|
|
s3 = memory_store.create_session(user["id"]) # "current"
|
|
closed = close_orphaned_sessions(user["id"], keep_session_id=s3)
|
|
assert set(closed) == {s1, s2}
|
|
assert s3 not in closed
|
|
|
|
def test_closed_sessions_have_ended_at(self, temp_db, user):
|
|
from bigmind.auto_close import close_orphaned_sessions
|
|
s1 = memory_store.create_session(user["id"])
|
|
s2 = memory_store.create_session(user["id"]) # keep
|
|
close_orphaned_sessions(user["id"], keep_session_id=s2)
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT ended_at, one_liner FROM sessions WHERE id=?", (s1,)
|
|
).fetchone()
|
|
assert row["ended_at"] is not None
|
|
assert "orphaned" in row["one_liner"]
|
|
|
|
def test_keep_session_remains_open(self, temp_db, user):
|
|
from bigmind.auto_close import close_orphaned_sessions
|
|
memory_store.create_session(user["id"])
|
|
s_current = memory_store.create_session(user["id"])
|
|
close_orphaned_sessions(user["id"], keep_session_id=s_current)
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT ended_at FROM sessions WHERE id=?", (s_current,)
|
|
).fetchone()
|
|
assert row["ended_at"] is None
|
|
|
|
def test_returns_empty_when_no_orphans(self, temp_db, user):
|
|
from bigmind.auto_close import close_orphaned_sessions
|
|
s_current = memory_store.create_session(user["id"])
|
|
assert close_orphaned_sessions(user["id"], keep_session_id=s_current) == []
|
|
|
|
def test_does_not_touch_already_closed_sessions(self, temp_db, user):
|
|
from bigmind.auto_close import close_orphaned_sessions
|
|
s_old = memory_store.create_session(user["id"])
|
|
memory_store.close_session(s_old, "Already closed")
|
|
s_current = memory_store.create_session(user["id"])
|
|
closed = close_orphaned_sessions(user["id"], keep_session_id=s_current)
|
|
assert s_old not in closed
|
|
|
|
def test_isolated_by_user(self, temp_db):
|
|
from bigmind.auto_close import close_orphaned_sessions
|
|
u1 = memory_store.get_or_create_user("cleanup_user_a")
|
|
u2 = memory_store.get_or_create_user("cleanup_user_b")
|
|
memory_store.create_session(u1["id"])
|
|
s_u1_current = memory_store.create_session(u1["id"])
|
|
s_u2_active = memory_store.create_session(u2["id"])
|
|
close_orphaned_sessions(u1["id"], keep_session_id=s_u1_current)
|
|
still_open = memory_store.get_open_sessions(u2["id"])
|
|
assert any(s["id"] == s_u2_active for s in still_open)
|
|
|
|
|
|
# ── Restart Server ─────────────────────────────────────────────────────────────
|
|
|
|
class TestRestartServer:
|
|
"""Tests for restart_server_in_place — os.execv-based in-place restart."""
|
|
|
|
def test_calls_execv_with_correct_args(self, monkeypatch):
|
|
"""restart_server_in_place must call os.execv(sys.executable, [sys.executable] + sys.argv)."""
|
|
import os
|
|
import sys
|
|
import time
|
|
from bigmind.auto_close import restart_server_in_place
|
|
|
|
execv_calls = []
|
|
monkeypatch.setattr(os, "execv", lambda path, args: execv_calls.append((path, args)))
|
|
monkeypatch.setattr(time, "sleep", lambda _: None) # skip the 500ms delay
|
|
|
|
restart_server_in_place()
|
|
|
|
assert len(execv_calls) == 1
|
|
path, args = execv_calls[0]
|
|
assert path == sys.executable
|
|
assert args[0] == sys.executable
|
|
assert args[1:] == sys.argv
|
|
|
|
def test_sleep_called_before_execv(self, monkeypatch):
|
|
"""sleep must be called before execv so the MCP response is delivered first."""
|
|
import os
|
|
import time
|
|
from bigmind.auto_close import restart_server_in_place
|
|
|
|
call_order = []
|
|
monkeypatch.setattr(time, "sleep", lambda _: call_order.append("sleep"))
|
|
monkeypatch.setattr(os, "execv", lambda *_: call_order.append("execv"))
|
|
|
|
restart_server_in_place()
|
|
|
|
assert call_order == ["sleep", "execv"]
|
|
|
|
def test_sleep_duration_is_half_second(self, monkeypatch):
|
|
"""The delay must be 0.5s — long enough for the MCP response to be sent."""
|
|
import os
|
|
import time
|
|
from bigmind.auto_close import restart_server_in_place
|
|
|
|
sleep_durations = []
|
|
monkeypatch.setattr(time, "sleep", lambda d: sleep_durations.append(d))
|
|
monkeypatch.setattr(os, "execv", lambda *_: None)
|
|
|
|
restart_server_in_place()
|
|
|
|
assert sleep_durations == [0.5]
|
|
|
|
|
|
|
|
|
|
|