"""Tests for Feature 7 — Live Session Awareness. Covers: - announce_focus() — atomic conflict detection, focus writes - get_active_sessions() — idle_minutes, field values, exclusion of closed - close_session() — must NULL focus columns - Schema v6 — sessions focus columns + token_saves table - log_token_save() — insert, accumulate, stats - get_token_efficiency_stats() — totals, session filter, best, by_method """ import json import pytest from bigmind import memory_store from bigmind.db import db @pytest.fixture def user(temp_db): return memory_store.get_or_create_user("testuser", "Test User") # ── announce_focus ───────────────────────────────────────────────────────────── class TestAnnounceFocus: def test_sets_focus_and_files(self, temp_db, user): sid = memory_store.create_session(user["id"]) result = memory_store.announce_focus(sid, "Working on db.py", ["bigmind/db.py"]) assert result["updated"] is True assert result["conflicts"] == [] sessions = memory_store.get_active_sessions(user["id"]) assert len(sessions) == 1 assert sessions[0]["focus"] == "Working on db.py" assert "bigmind/db.py" in sessions[0]["files"] def test_sets_ide_hint(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.announce_focus(sid, "task", ["file.py"], ide_hint="PyCharm") sessions = memory_store.get_active_sessions(user["id"]) assert sessions[0]["ide_hint"] == "PyCharm" def test_detects_file_conflict(self, temp_db, user): sid1 = memory_store.create_session(user["id"]) sid2 = memory_store.create_session(user["id"]) memory_store.announce_focus(sid1, "Working on server.py", ["src/server.py"]) result = memory_store.announce_focus(sid2, "Also server.py", ["src/server.py"]) assert len(result["conflicts"]) == 1 assert result["conflicts"][0]["session_id"] == sid1[:8] assert "src/server.py" in result["conflicts"][0]["overlapping_files"] def test_no_conflict_for_different_files(self, temp_db, user): sid1 = memory_store.create_session(user["id"]) sid2 = memory_store.create_session(user["id"]) memory_store.announce_focus(sid1, "Editing db.py", ["bigmind/db.py"]) result = memory_store.announce_focus(sid2, "Editing server.py", ["src/server.py"]) assert result["conflicts"] == [] def test_second_call_overwrites_focus(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.announce_focus(sid, "First task", ["a.py"]) memory_store.announce_focus(sid, "Second task", ["b.py"]) sessions = memory_store.get_active_sessions(user["id"]) assert sessions[0]["focus"] == "Second task" assert "b.py" in sessions[0]["files"] assert "a.py" not in sessions[0]["files"] def test_empty_files_list_no_conflict(self, temp_db, user): sid1 = memory_store.create_session(user["id"]) sid2 = memory_store.create_session(user["id"]) memory_store.announce_focus(sid1, "Task A", []) result = memory_store.announce_focus(sid2, "Task B", []) assert result["conflicts"] == [] def test_own_session_not_a_conflict(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.announce_focus(sid, "First focus", ["server.py"]) result = memory_store.announce_focus(sid, "Updated focus", ["server.py"]) assert result["conflicts"] == [] def test_conflict_lists_overlapping_files_only(self, temp_db, user): sid1 = memory_store.create_session(user["id"]) sid2 = memory_store.create_session(user["id"]) memory_store.announce_focus(sid1, "Multiple files", ["a.py", "b.py", "c.py"]) result = memory_store.announce_focus(sid2, "Partial overlap", ["b.py", "z.py"]) assert len(result["conflicts"]) == 1 assert result["conflicts"][0]["overlapping_files"] == ["b.py"] def test_conflict_not_raised_for_closed_session(self, temp_db, user): sid1 = memory_store.create_session(user["id"]) memory_store.announce_focus(sid1, "Old work", ["x.py"]) memory_store.close_session(sid1, "done") sid2 = memory_store.create_session(user["id"]) result = memory_store.announce_focus(sid2, "New work", ["x.py"]) assert result["conflicts"] == [] # ── get_active_sessions ──────────────────────────────────────────────────────── class TestGetActiveSessions: def test_returns_all_open_sessions(self, temp_db, user): sid1 = memory_store.create_session(user["id"]) sid2 = memory_store.create_session(user["id"]) sessions = memory_store.get_active_sessions(user["id"]) ids = [s["session_id"] for s in sessions] assert sid1 in ids assert sid2 in ids def test_excludes_closed_sessions(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "done") sessions = memory_store.get_active_sessions(user["id"]) ids = [s["session_id"] for s in sessions] assert sid not in ids def test_idle_minutes_non_negative(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.announce_focus(sid, "task", []) sessions = memory_store.get_active_sessions(user["id"]) assert sessions[0]["idle_minutes"] is not None assert sessions[0]["idle_minutes"] >= 0 def test_focus_null_when_not_announced(self, temp_db, user): sid = memory_store.create_session(user["id"]) sessions = memory_store.get_active_sessions(user["id"]) s = next(x for x in sessions if x["session_id"] == sid) assert s["focus"] is None assert s["files"] == [] assert s["ide_hint"] is None def test_focus_reflects_announce(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.announce_focus(sid, "Live task", ["db.py"], ide_hint="VS Code") sessions = memory_store.get_active_sessions(user["id"]) s = next(x for x in sessions if x["session_id"] == sid) assert s["focus"] == "Live task" assert "db.py" in s["files"] assert s["ide_hint"] == "VS Code" def test_empty_when_no_open_sessions(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.close_session(sid, "done") sessions = memory_store.get_active_sessions(user["id"]) assert sessions == [] # ── close_session clears focus ──────────────────────────────────────────────── class TestCloseClearsFocus: def test_focus_columns_nulled_on_close(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.announce_focus(sid, "some work", ["x.py"], ide_hint="PyCharm") # Verify focus was set with db() as conn: row = conn.execute( "SELECT current_focus, focus_files, focus_updated_at FROM sessions WHERE id=?", (sid,), ).fetchone() assert row["current_focus"] == "some work" memory_store.close_session(sid, "finished", topics="test") # All focus columns must be NULL after close with db() as conn: row = conn.execute( "SELECT current_focus, focus_files, focus_updated_at FROM sessions WHERE id=?", (sid,), ).fetchone() assert row["current_focus"] is None assert row["focus_files"] is None assert row["focus_updated_at"] is None # ── Schema v6 ───────────────────────────────────────────────────────────────── class TestSchemaV6: def test_sessions_have_focus_columns(self, temp_db, user): sid = memory_store.create_session(user["id"]) with db() as conn: row = conn.execute("SELECT * FROM sessions WHERE id=?", (sid,)).fetchone() col_names = row.keys() assert "current_focus" in col_names assert "focus_files" in col_names assert "focus_updated_at" in col_names assert "ide_hint" in col_names def test_token_saves_table_exists(self, temp_db): with db() as conn: count = conn.execute("SELECT COUNT(*) FROM token_saves").fetchone()[0] assert count == 0 # table exists, just empty def test_schema_version_is_8(self, temp_db): with db() as conn: version = conn.execute( "SELECT version FROM schema_version" ).fetchone()["version"] assert version == 8 # ── Token Efficiency Tracker (Feature 6) ────────────────────────────────────── class TestTokenSaves: def test_log_returns_row_id(self, temp_db, user): sid = memory_store.create_session(user["id"]) row_id = memory_store.log_token_save( sid, user["id"], "grep instead of read", 50_000 ) assert isinstance(row_id, int) and row_id > 0 def test_total_accumulates(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.log_token_save(sid, user["id"], "save 1", 10_000, method_used="grep") memory_store.log_token_save(sid, user["id"], "save 2", 20_000, method_used="memory_hit") stats = memory_store.get_token_efficiency_stats(user["id"]) assert stats["total_tokens_saved"] == 30_000 def test_session_total_correct(self, temp_db, user): sid1 = memory_store.create_session(user["id"]) sid2 = memory_store.create_session(user["id"]) memory_store.log_token_save(sid1, user["id"], "s1 save", 5_000) memory_store.log_token_save(sid2, user["id"], "s2 save", 8_000) stats = memory_store.get_token_efficiency_stats(user["id"], session_id=sid1) assert stats["session_tokens_saved"] == 5_000 def test_best_save_returns_largest(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.log_token_save(sid, user["id"], "small", 1_000) memory_store.log_token_save(sid, user["id"], "big one", 999_000) memory_store.log_token_save(sid, user["id"], "medium", 50_000) stats = memory_store.get_token_efficiency_stats(user["id"]) assert stats["best_save"]["tokens_saved_estimate"] == 999_000 assert stats["best_save"]["description"] == "big one" def test_by_method_aggregation(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.log_token_save(sid, user["id"], "g1", 10_000, method_used="grep") memory_store.log_token_save(sid, user["id"], "g2", 10_000, method_used="grep") memory_store.log_token_save(sid, user["id"], "m1", 5_000, method_used="memory_hit") stats = memory_store.get_token_efficiency_stats(user["id"]) by_method = {r["method_used"]: r["total"] for r in stats["by_method"]} assert by_method["grep"] == 20_000 assert by_method["memory_hit"] == 5_000 def test_empty_stats_when_no_saves(self, temp_db, user): stats = memory_store.get_token_efficiency_stats(user["id"]) assert stats["total_tokens_saved"] == 0 assert stats["best_save"] is None assert stats["recent_saves"] == [] def test_method_stored_correctly(self, temp_db, user): sid = memory_store.create_session(user["id"]) memory_store.log_token_save( sid, user["id"], "tail log file", 200_000, method_used="tail" ) stats = memory_store.get_token_efficiency_stats(user["id"]) assert stats["best_save"]["method_used"] == "tail"