"""Tests for memory_store CRUD operations.""" import json import pytest from datetime import datetime, timezone, timedelta from pathlib import Path from bigmind import memory_store from bigmind.db import db @pytest.fixture def user(): return memory_store.get_or_create_user("testuser", "Test User") class TestUsers: def test_create_user(self, temp_db): u = memory_store.get_or_create_user("alice") assert u["username"] == "alice" assert u["id"] def test_get_existing_user(self, temp_db): u1 = memory_store.get_or_create_user("bob") u2 = memory_store.get_or_create_user("bob") assert u1["id"] == u2["id"] def test_get_current_username_from_env(self, monkeypatch): monkeypatch.setenv("BIGMIND_USER", "envuser") assert memory_store.get_current_username() == "envuser" class TestIdentityProfile: def test_upsert_creates_profile(self, temp_db, user): profile = memory_store.upsert_identity_profile( user["id"], role="Engineer", preferences="Python first" ) assert profile["role"] == "Engineer" assert profile["preferences"] == "Python first" def test_upsert_partial_update_preserves_other_fields(self, temp_db, user): memory_store.upsert_identity_profile( user["id"], role="Eng", preferences="Python", pinned_facts="- fact 1" ) # Only update role — preferences and pinned_facts should be preserved memory_store.upsert_identity_profile(user["id"], role="Senior Eng") profile = memory_store.get_identity_profile(user["id"]) assert profile["role"] == "Senior Eng" assert profile["preferences"] == "Python" assert "fact 1" in profile["pinned_facts"] def test_get_missing_profile_returns_none(self, temp_db): assert memory_store.get_identity_profile("nonexistent-id") is None class TestSessions: def test_create_session_returns_id(self, temp_db, user): sid = memory_store.create_session(user["id"]) assert sid and len(sid) == 36 # UUID format def test_close_session(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "Built a thing", topics="mcp", outcome="Done") sessions = memory_store.get_recent_sessions(user["id"]) assert len(sessions) == 1 assert sessions[0]["one_liner"] == "Built a thing" assert sessions[0]["topics"] == "mcp" def test_one_liner_truncated_at_120_chars(self, temp_db, user): sid = memory_store.create_session(user["id"]) long_title = "x" * 200 memory_store.close_session(sid, long_title) sessions = memory_store.get_recent_sessions(user["id"]) assert len(sessions[0]["one_liner"]) == 120 def test_open_session_not_in_recent(self, temp_db, user): memory_store.create_session(user["id"]) # Open sessions (no ended_at) must NOT appear in the recent list assert memory_store.get_recent_sessions(user["id"]) == [] def test_save_session_summary(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "Headline") memory_store.save_session_summary(sid, "Detailed summary", key_facts="- fact") detail = memory_store.get_session_detail(sid) assert detail["summary"] == "Detailed summary" assert detail["key_facts"] == "- fact" # has_tier2 flag must be set sessions = memory_store.get_recent_sessions(user["id"]) assert sessions[0]["has_tier2"] == 1 def test_get_open_sessions(self, temp_db, user): sid = memory_store.create_session(user["id"]) open_s = memory_store.get_open_sessions(user["id"]) assert any(s["id"] == sid for s in open_s) class TestChunks: def test_append_and_search_chunks(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.append_chunk( sid, user["id"], "assistant", "We decided to use SQLite for the database", "architectural decision" ) results = memory_store.search_chunks(user["id"], "SQLite") assert len(results) == 1 assert "SQLite" in results[0]["content"] def test_chunk_seq_increments(self, temp_db, user): sid = memory_store.create_session(user["id"]) id1 = memory_store.append_chunk(sid, user["id"], "user", "first") id2 = memory_store.append_chunk(sid, user["id"], "assistant", "second") assert id2 > id1 def test_search_isolated_by_user(self, temp_db): u1 = memory_store.get_or_create_user("user_a") u2 = memory_store.get_or_create_user("user_b") sid1 = memory_store.create_session(u1["id"]) memory_store.append_chunk(sid1, u1["id"], "user", "secret data") # u2 should not see u1's chunks assert memory_store.search_chunks(u2["id"], "secret") == [] class TestFacts: def test_store_and_retrieve_fact(self, temp_db, user): fid = memory_store.store_fact(user["id"], "preference", "Prefers dark mode") facts = memory_store.get_facts(user["id"]) assert any(f["id"] == fid for f in facts) def test_filter_by_category(self, temp_db, user): memory_store.store_fact(user["id"], "preference", "Python first") memory_store.store_fact(user["id"], "decision", "Use SQLite") prefs = memory_store.get_facts(user["id"], category="preference") assert all(f["category"] == "preference" for f in prefs) class TestFactDeprecation: def test_deprecate_returns_true_on_success(self, temp_db, user): fid = memory_store.store_fact(user["id"], "codebase", "Old stack note") result = memory_store.deprecate_fact(fid, user["id"], "No longer applicable") assert result is True def test_deprecate_hides_fact_from_get_facts(self, temp_db, user): fid = memory_store.store_fact(user["id"], "codebase", "Deprecated technology") memory_store.deprecate_fact(fid, user["id"]) facts = memory_store.get_facts(user["id"]) assert not any(f["id"] == fid for f in facts) def test_deprecated_fact_visible_with_include_deprecated(self, temp_db, user): fid = memory_store.store_fact(user["id"], "codebase", "Soft-deleted fact") memory_store.deprecate_fact(fid, user["id"], "outdated") all_facts = memory_store.get_facts(user["id"], include_deprecated=True) assert any(f["id"] == fid for f in all_facts) def test_deprecation_reason_stored(self, temp_db, user): fid = memory_store.store_fact(user["id"], "decision", "Use Gradle") memory_store.deprecate_fact(fid, user["id"], "Switched to Maven") all_facts = memory_store.get_facts(user["id"], include_deprecated=True) deprecated = next(f for f in all_facts if f["id"] == fid) assert deprecated["deprecation_reason"] == "Switched to Maven" assert deprecated["deprecated"] == 1 def test_deprecate_no_reason_still_works(self, temp_db, user): fid = memory_store.store_fact(user["id"], "preference", "Some preference") result = memory_store.deprecate_fact(fid, user["id"]) assert result is True facts = memory_store.get_facts(user["id"]) assert not any(f["id"] == fid for f in facts) def test_deprecate_unknown_id_returns_false(self, temp_db, user): result = memory_store.deprecate_fact(99999, user["id"]) assert result is False def test_deprecate_wrong_user_returns_false(self, temp_db): user_a = memory_store.get_or_create_user("user_a") user_b = memory_store.get_or_create_user("user_b") fid = memory_store.store_fact(user_a["id"], "codebase", "User A's fact") result = memory_store.deprecate_fact(fid, user_b["id"], "Should not work") assert result is False # Fact must still be visible for user_a facts = memory_store.get_facts(user_a["id"]) assert any(f["id"] == fid for f in facts) def test_non_deprecated_facts_still_returned_by_default(self, temp_db, user): fid_keep = memory_store.store_fact(user["id"], "preference", "Keep this one") fid_drop = memory_store.store_fact(user["id"], "preference", "Drop this one") memory_store.deprecate_fact(fid_drop, user["id"]) facts = memory_store.get_facts(user["id"]) ids = [f["id"] for f in facts] assert fid_keep in ids assert fid_drop not in ids class TestStats: def test_stats_returns_expected_keys(self, temp_db, user): stats = memory_store.get_stats(user["id"]) for key in ("sessions", "facts", "chunks", "db_size_kb", "db_path"): assert key in stats class TestHypotheses: def test_add_hypothesis_returns_id(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "I believe X because Y") assert isinstance(hid, int) assert hid > 0 def test_add_hypothesis_default_confidence(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "Default confidence belief") results = memory_store.list_hypotheses(user["id"]) assert results[0]["confidence"] == 0.7 def test_add_hypothesis_custom_confidence(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.add_hypothesis(user["id"], sid, "Strong belief", confidence=0.95) results = memory_store.list_hypotheses(user["id"]) assert results[0]["confidence"] == 0.95 def test_new_hypothesis_status_is_open(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.add_hypothesis(user["id"], sid, "Fresh thought") results = memory_store.list_hypotheses(user["id"]) assert results[0]["status"] == "open" def test_list_hypotheses_returns_all(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.add_hypothesis(user["id"], sid, "Thought one") memory_store.add_hypothesis(user["id"], sid, "Thought two") results = memory_store.list_hypotheses(user["id"]) assert len(results) == 2 def test_list_hypotheses_filter_by_status(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "Will be confirmed") memory_store.add_hypothesis(user["id"], sid, "Stays open") memory_store.resolve_hypothesis(hid, user["id"], "confirmed", "It was true") open_list = memory_store.list_hypotheses(user["id"], status="open") confirmed_list = memory_store.list_hypotheses(user["id"], status="confirmed") assert len(open_list) == 1 assert open_list[0]["hypothesis"] == "Stays open" assert len(confirmed_list) == 1 assert confirmed_list[0]["hypothesis"] == "Will be confirmed" def test_resolve_confirmed(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "Bug is in serializer") result = memory_store.resolve_hypothesis( hid, user["id"], "confirmed", "Confirmed — the serializer had a null check missing" ) assert result is True resolved = memory_store.list_hypotheses(user["id"], status="confirmed") assert len(resolved) == 1 assert resolved[0]["resolution"] == "Confirmed — the serializer had a null check missing" assert resolved[0]["resolved_at"] is not None def test_resolve_refuted(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "Problem is in the network") memory_store.resolve_hypothesis(hid, user["id"], "refuted", "Was actually a race condition") results = memory_store.list_hypotheses(user["id"], status="refuted") assert results[0]["status"] == "refuted" def test_resolve_abandoned(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "Might be a cache issue") memory_store.resolve_hypothesis(hid, user["id"], "abandoned") results = memory_store.list_hypotheses(user["id"], status="abandoned") assert len(results) == 1 def test_resolve_invalid_status_raises(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "Some thought") with pytest.raises(ValueError, match="Invalid status"): memory_store.resolve_hypothesis(hid, user["id"], "wrong_status") def test_resolve_unknown_id_returns_false(self, temp_db, user): result = memory_store.resolve_hypothesis(99999, user["id"], "confirmed") assert result is False def test_resolve_wrong_user_returns_false(self, temp_db): user_a = memory_store.get_or_create_user("user_a") user_b = memory_store.get_or_create_user("user_b") sid = memory_store.create_session(user_a["id"]) hid = memory_store.add_hypothesis(user_a["id"], sid, "User A's private thought") result = memory_store.resolve_hypothesis(hid, user_b["id"], "confirmed") assert result is False # Hypothesis must still be open for user_a results = memory_store.list_hypotheses(user_a["id"], status="open") assert len(results) == 1 def test_list_isolated_by_user(self, temp_db): user_a = memory_store.get_or_create_user("user_a") user_b = memory_store.get_or_create_user("user_b") sid_a = memory_store.create_session(user_a["id"]) memory_store.add_hypothesis(user_a["id"], sid_a, "User A only sees this") assert memory_store.list_hypotheses(user_b["id"]) == [] def test_resolve_no_resolution_text_is_allowed(self, temp_db, user): sid = memory_store.create_session(user["id"]) hid = memory_store.add_hypothesis(user["id"], sid, "Quick thought") result = memory_store.resolve_hypothesis(hid, user["id"], "abandoned") assert result is True class TestSearchFacts: def test_search_returns_matching_fact(self, temp_db, user): memory_store.store_fact(user["id"], "codebase", "We use SQLite for local storage") results = memory_store.search_facts(user["id"], "SQLite") assert len(results) == 1 assert "SQLite" in results[0]["fact"] def test_search_porter_stemming(self, temp_db, user): memory_store.store_fact(user["id"], "codebase", "FastMCP serialization rules") # 'serialize' should match 'serialization' via Porter stemmer results = memory_store.search_facts(user["id"], "serialize") assert len(results) == 1 def test_search_no_results(self, temp_db, user): memory_store.store_fact(user["id"], "preference", "Prefers dark mode") results = memory_store.search_facts(user["id"], "quantum") assert results == [] def test_search_excludes_deprecated(self, temp_db, user): fid = memory_store.store_fact(user["id"], "codebase", "Old deprecated SQLite note") memory_store.deprecate_fact(fid, user["id"]) results = memory_store.search_facts(user["id"], "SQLite") assert results == [] def test_search_isolated_by_user(self, temp_db): user_a = memory_store.get_or_create_user("search_a") user_b = memory_store.get_or_create_user("search_b") memory_store.store_fact(user_a["id"], "codebase", "User A secret fact") assert memory_store.search_facts(user_b["id"], "secret") == [] def test_search_returns_category(self, temp_db, user): memory_store.store_fact(user["id"], "preference", "Prefers uv over pip") results = memory_store.search_facts(user["id"], "uv") assert results[0]["category"] == "preference" def test_search_limit(self, temp_db, user): for i in range(5): memory_store.store_fact(user["id"], "codebase", f"SQLite fact number {i}") results = memory_store.search_facts(user["id"], "SQLite", limit=3) assert len(results) == 3 def test_search_multiword_and_match(self, temp_db, user): # Regression: multi-word query must AND-match (both words anywhere), # NOT phrase-match (words consecutive). The 2026-03-31 fix accidentally # broke this by wrapping the whole query in one double-quoted phrase. memory_store.store_fact(user["id"], "codebase", "BigMind uses SQLite for persistent local storage") # Both words present but NOT consecutive — phrase search would return nothing results = memory_store.search_facts(user["id"], "SQLite persistent") assert len(results) == 1, "Multi-word AND search must match even when words are not consecutive" def test_search_multiword_partial_match_returns_nothing(self, temp_db, user): # If only one of two required words is present, no match memory_store.store_fact(user["id"], "codebase", "We use SQLite locally") results = memory_store.search_facts(user["id"], "SQLite quantum") assert results == [] def test_search_reserved_word_category(self, temp_db, user): # Regression: FTS5 reserved words like 'rank', 'content', 'category' # must not crash or return wrong results memory_store.store_fact(user["id"], "codebase", "rank and content pipeline") results = memory_store.search_facts(user["id"], "rank") assert len(results) == 1 def test_search_reserved_word_content(self, temp_db, user): memory_store.store_fact(user["id"], "codebase", "the content table stores data") results = memory_store.search_facts(user["id"], "content") assert len(results) == 1 def test_search_three_word_query(self, temp_db, user): memory_store.store_fact(user["id"], "codebase", "parallel sessions across IDEs share the same SQLite database") results = memory_store.search_facts(user["id"], "parallel sessions SQLite") assert len(results) == 1 class TestSearchChunksMultiword: def test_chunk_multiword_and_match(self, temp_db, user): # Same regression test for search_chunks sid = memory_store.create_session(user["id"]) memory_store.append_chunk(sid, user["id"], "assistant", "WAL mode allows parallel reads from multiple IDE sessions", "architectural decision") # Words present but not consecutive — phrase search would fail results = memory_store.search_chunks(user["id"], "parallel IDE") assert len(results) == 1, "Multi-word AND search on chunks must not be a phrase search" def test_chunk_reserved_word_rank(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.append_chunk(sid, user["id"], "assistant", "bm25 rank scores the relevance of results", None) results = memory_store.search_chunks(user["id"], "rank") assert len(results) == 1 class TestUpgradeRequests: def test_add_returns_id(self, temp_db, user): sid = memory_store.create_session(user["id"]) rid = memory_store.add_upgrade_request( user["id"], sid, "Add FTS to facts", "Would speed up recall" ) assert isinstance(rid, int) and rid > 0 def test_default_status_is_open(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y") results = memory_store.list_upgrade_requests(user["id"]) assert results[0]["status"] == "open" def test_default_priority_is_medium(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y") results = memory_store.list_upgrade_requests(user["id"]) assert results[0]["priority"] == "medium" def test_default_certainty(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y") results = memory_store.list_upgrade_requests(user["id"]) assert results[0]["certainty"] == 0.7 def test_custom_priority_and_certainty(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.add_upgrade_request( user["id"], sid, "Critical feature", "Blocks work", priority="high", certainty=0.95 ) results = memory_store.list_upgrade_requests(user["id"]) assert results[0]["priority"] == "high" assert results[0]["certainty"] == 0.95 def test_list_filter_by_status(self, temp_db, user): sid = memory_store.create_session(user["id"]) rid = memory_store.add_upgrade_request(user["id"], sid, "Feature A", "Reason A") memory_store.add_upgrade_request(user["id"], sid, "Feature B", "Reason B") memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Done") open_list = memory_store.list_upgrade_requests(user["id"], status="open") resolved_list = memory_store.list_upgrade_requests(user["id"], status="resolved") assert len(open_list) == 1 and open_list[0]["description"] == "Feature B" assert len(resolved_list) == 1 and resolved_list[0]["description"] == "Feature A" def test_resolve_resolved(self, temp_db, user): sid = memory_store.create_session(user["id"]) rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y") result = memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Shipped in v4") assert result is True results = memory_store.list_upgrade_requests(user["id"], status="resolved") assert results[0]["resolution"] == "Shipped in v4" assert results[0]["resolved_at"] is not None def test_resolve_rejected(self, temp_db, user): sid = memory_store.create_session(user["id"]) rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y") memory_store.resolve_upgrade_request(rid, user["id"], "rejected", "Out of scope") results = memory_store.list_upgrade_requests(user["id"], status="rejected") assert results[0]["status"] == "rejected" def test_resolve_invalid_status_raises(self, temp_db, user): sid = memory_store.create_session(user["id"]) rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y") with pytest.raises(ValueError, match="Invalid status"): memory_store.resolve_upgrade_request(rid, user["id"], "done") def test_resolve_unknown_id_returns_false(self, temp_db, user): assert memory_store.resolve_upgrade_request(99999, user["id"], "resolved") is False def test_resolve_wrong_user_returns_false(self, temp_db): user_a = memory_store.get_or_create_user("user_a") user_b = memory_store.get_or_create_user("user_b") sid = memory_store.create_session(user_a["id"]) rid = memory_store.add_upgrade_request(user_a["id"], sid, "Feature X", "Reason Y") assert memory_store.resolve_upgrade_request(rid, user_b["id"], "resolved") is False assert memory_store.list_upgrade_requests(user_a["id"], status="open")[0]["id"] == rid def test_list_isolated_by_user(self, temp_db): user_a = memory_store.get_or_create_user("user_a") user_b = memory_store.get_or_create_user("user_b") sid = memory_store.create_session(user_a["id"]) memory_store.add_upgrade_request(user_a["id"], sid, "Only A sees this", "Reason") assert memory_store.list_upgrade_requests(user_b["id"]) == [] # ── Health Check ─────────────────────────────────────────────────────────────── class TestHealthCheck: def test_returns_expected_keys(self, temp_db, user): report = memory_store.health_check(user["id"]) for key in ( "stale_facts", "sessions_without_summary", "open_sessions", "chunk_count", "fts_row_count", "fts_in_sync", "low_confidence_facts", "stale_threshold_days", ): assert key in report def test_empty_db_no_issues(self, temp_db, user): report = memory_store.health_check(user["id"]) assert report["stale_facts"] == [] assert report["sessions_without_summary"] == 0 assert report["open_sessions"] == [] assert report["fts_in_sync"] is True def test_detects_stale_facts(self, temp_db, user): fid = memory_store.store_fact(user["id"], "codebase", "Old deployment note") old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat() with db() as conn: conn.execute("UPDATE facts SET updated_at=? WHERE id=?", (old_date, fid)) report = memory_store.health_check(user["id"], stale_days=30) assert len(report["stale_facts"]) == 1 assert report["stale_facts"][0]["id"] == fid def test_fresh_facts_not_flagged(self, temp_db, user): memory_store.store_fact(user["id"], "preference", "Fresh fact today") report = memory_store.health_check(user["id"], stale_days=30) assert report["stale_facts"] == [] def test_detects_sessions_without_summary(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "Closed without summary") report = memory_store.health_check(user["id"]) assert report["sessions_without_summary"] == 1 def test_sessions_with_summary_not_flagged(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "Closed with summary") memory_store.save_session_summary(sid, "Full narrative here.") report = memory_store.health_check(user["id"]) assert report["sessions_without_summary"] == 0 def test_detects_open_sessions(self, temp_db, user): memory_store.create_session(user["id"]) report = memory_store.health_check(user["id"]) assert len(report["open_sessions"]) == 1 def test_detects_low_confidence_facts(self, temp_db, user): memory_store.store_fact(user["id"], "codebase", "Uncertain thing", confidence=0.5) report = memory_store.health_check(user["id"]) assert len(report["low_confidence_facts"]) == 1 assert report["low_confidence_facts"][0]["confidence"] == 0.5 def test_high_confidence_facts_not_flagged(self, temp_db, user): memory_store.store_fact(user["id"], "preference", "Certain thing", confidence=1.0) report = memory_store.health_check(user["id"]) assert report["low_confidence_facts"] == [] def test_fts_in_sync_after_append(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.append_chunk(sid, user["id"], "user", "FTS test chunk") report = memory_store.health_check(user["id"]) assert report["fts_in_sync"] is True assert report["chunk_count"] == 1 assert report["fts_row_count"] == 1 def test_stale_threshold_stored_in_report(self, temp_db, user): report = memory_store.health_check(user["id"], stale_days=45) assert report["stale_threshold_days"] == 45 class TestExportMemory: def test_creates_file(self, temp_db, user, tmp_path): out = str(tmp_path / "export.json") memory_store.export_memory(user["id"], out) assert Path(out).exists() def test_json_structure(self, temp_db, user, tmp_path): out = str(tmp_path / "export.json") memory_store.export_memory(user["id"], out) data = json.loads(Path(out).read_text()) for key in ( "export_date", "bigmind_schema_version", "user", "identity_profile", "facts", "sessions", "conversation_chunks", "hypotheses", "upgrade_requests", "token_saves", "people", "stats", ): assert key in data def test_facts_included(self, temp_db, user, tmp_path): memory_store.store_fact(user["id"], "preference", "Exported preference fact") out = str(tmp_path / "export.json") memory_store.export_memory(user["id"], out) data = json.loads(Path(out).read_text()) assert any(f["fact"] == "Exported preference fact" for f in data["facts"]) def test_sessions_included_with_tier2(self, temp_db, user, tmp_path): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "Exported session") memory_store.save_session_summary(sid, "Full story for export test.") out = str(tmp_path / "export.json") memory_store.export_memory(user["id"], out) data = json.loads(Path(out).read_text()) exported = next(s for s in data["sessions"] if s["id"] == sid) assert exported["one_liner"] == "Exported session" assert exported["tier2_summary"]["summary"] == "Full story for export test." def test_session_without_summary_has_null_tier2(self, temp_db, user, tmp_path): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "No narrative session") out = str(tmp_path / "export.json") memory_store.export_memory(user["id"], out) data = json.loads(Path(out).read_text()) exported = next(s for s in data["sessions"] if s["id"] == sid) assert exported["tier2_summary"] is None def test_chunks_included(self, temp_db, user, tmp_path): sid = memory_store.create_session(user["id"]) memory_store.append_chunk(sid, user["id"], "assistant", "Exported chunk content") out = str(tmp_path / "export.json") memory_store.export_memory(user["id"], out) data = json.loads(Path(out).read_text()) assert any("Exported chunk content" in c["content"] for c in data["conversation_chunks"]) def test_stats_accurate(self, temp_db, user, tmp_path): memory_store.store_fact(user["id"], "pref", "Fact A") memory_store.store_fact(user["id"], "pref", "Fact B") out = str(tmp_path / "export.json") memory_store.export_memory(user["id"], out) data = json.loads(Path(out).read_text()) assert data["stats"]["facts_count"] == 2 def test_returns_result_dict(self, temp_db, user, tmp_path): out = str(tmp_path / "export.json") result = memory_store.export_memory(user["id"], out) assert result["output_path"] == out for key in ("facts_count", "sessions_count", "chunks_count", "file_size_kb"): assert key in result def test_default_path_in_home_dir(self, temp_db, user): result = memory_store.export_memory(user["id"]) path = Path(result["output_path"]) try: assert path.exists() assert path.parent == Path.home() finally: path.unlink(missing_ok=True) # cleanup # ── Auto-close / Orphaned Sessions ──────────────────────────────────────────── class TestCloseOrphanedSessions: """Tests for close_orphaned_sessions — the manual session cleanup tool.""" def test_closes_all_open_except_current(self, temp_db, user): from bigmind.auto_close import close_orphaned_sessions s1 = memory_store.create_session(user["id"]) s2 = memory_store.create_session(user["id"]) s3 = memory_store.create_session(user["id"]) # "current" closed = close_orphaned_sessions(user["id"], keep_session_id=s3) assert set(closed) == {s1, s2} assert s3 not in closed def test_closed_sessions_have_ended_at(self, temp_db, user): from bigmind.auto_close import close_orphaned_sessions s1 = memory_store.create_session(user["id"]) s2 = memory_store.create_session(user["id"]) # keep close_orphaned_sessions(user["id"], keep_session_id=s2) with db() as conn: row = conn.execute( "SELECT ended_at, one_liner FROM sessions WHERE id=?", (s1,) ).fetchone() assert row["ended_at"] is not None assert "orphaned" in row["one_liner"] def test_keep_session_remains_open(self, temp_db, user): from bigmind.auto_close import close_orphaned_sessions memory_store.create_session(user["id"]) s_current = memory_store.create_session(user["id"]) close_orphaned_sessions(user["id"], keep_session_id=s_current) with db() as conn: row = conn.execute( "SELECT ended_at FROM sessions WHERE id=?", (s_current,) ).fetchone() assert row["ended_at"] is None def test_returns_empty_when_no_orphans(self, temp_db, user): from bigmind.auto_close import close_orphaned_sessions s_current = memory_store.create_session(user["id"]) assert close_orphaned_sessions(user["id"], keep_session_id=s_current) == [] def test_does_not_touch_already_closed_sessions(self, temp_db, user): from bigmind.auto_close import close_orphaned_sessions s_old = memory_store.create_session(user["id"]) memory_store.close_session(s_old, "Already closed") s_current = memory_store.create_session(user["id"]) closed = close_orphaned_sessions(user["id"], keep_session_id=s_current) assert s_old not in closed def test_isolated_by_user(self, temp_db): from bigmind.auto_close import close_orphaned_sessions u1 = memory_store.get_or_create_user("cleanup_user_a") u2 = memory_store.get_or_create_user("cleanup_user_b") memory_store.create_session(u1["id"]) s_u1_current = memory_store.create_session(u1["id"]) s_u2_active = memory_store.create_session(u2["id"]) close_orphaned_sessions(u1["id"], keep_session_id=s_u1_current) still_open = memory_store.get_open_sessions(u2["id"]) assert any(s["id"] == s_u2_active for s in still_open) # ── Restart Server ───────────────────────────────────────────────────────────── class TestRestartServer: """Tests for restart_server_in_place — os.execv-based in-place restart.""" def test_calls_execv_with_correct_args(self, monkeypatch): """restart_server_in_place must call os.execv(sys.executable, [sys.executable] + sys.argv).""" import os import sys import time from bigmind.auto_close import restart_server_in_place execv_calls = [] monkeypatch.setattr(os, "execv", lambda path, args: execv_calls.append((path, args))) monkeypatch.setattr(time, "sleep", lambda _: None) # skip the 500ms delay restart_server_in_place() assert len(execv_calls) == 1 path, args = execv_calls[0] assert path == sys.executable assert args[0] == sys.executable assert args[1:] == sys.argv def test_sleep_called_before_execv(self, monkeypatch): """sleep must be called before execv so the MCP response is delivered first.""" import os import time from bigmind.auto_close import restart_server_in_place call_order = [] monkeypatch.setattr(time, "sleep", lambda _: call_order.append("sleep")) monkeypatch.setattr(os, "execv", lambda *_: call_order.append("execv")) restart_server_in_place() assert call_order == ["sleep", "execv"] def test_sleep_duration_is_half_second(self, monkeypatch): """The delay must be 0.5s — long enough for the MCP response to be sent.""" import os import time from bigmind.auto_close import restart_server_in_place sleep_durations = [] monkeypatch.setattr(time, "sleep", lambda d: sleep_durations.append(d)) monkeypatch.setattr(os, "execv", lambda *_: None) restart_server_in_place() assert sleep_durations == [0.5]