Files
pi_mcps/mcp/bigmind/bigmind/memory_store.py
T
pplate 42ffc85f0b fix(bigmind): apply 4 health-check fixes — BUG-1/2/3 + PERF-1
BUG-1: fix test_server_tools.py assert "ALWAYS" → "Always" (case mismatch)
BUG-2: export_memory() now includes hypotheses, upgrade_requests, token_saves,
        people tables; renamed bigmind_version → bigmind_schema_version (int)
BUG-3: auto_close.py replaced CURRENT_TIMESTAMP (SQLite) with Python
        datetime.now(timezone.utc).isoformat() for consistent UTC timestamps
PERF-1: context_builder.py caps get_facts() at _MAX_CONTEXT_FACTS=50 with
         overflow hint to prevent unbounded context growth

All 297 tests passing. Upgrade requests #6-9 resolved.
Health report: plans/BIGMIND_HEALTH_REPORT_2026-04-04.md
2026-04-04 09:49:13 +02:00

973 lines
36 KiB
Python

"""Memory store: CRUD operations for all BigMind tiers."""
import uuid
import os
import logging
from datetime import datetime, timezone
from typing import Optional
from bigmind.db import db, get_db_path
logger = logging.getLogger("BigMindStore")
def get_current_username() -> str:
return (
os.environ.get("BIGMIND_USER")
or os.environ.get("USER")
or os.environ.get("USERNAME")
or "default"
)
# ── USERS ──────────────────────────────────────────────────────────────────────
def get_or_create_user(username: str, display_name: str = None) -> dict:
with db() as conn:
row = conn.execute(
"SELECT * FROM users WHERE username = ?", (username,)
).fetchone()
if row:
conn.execute(
"UPDATE users SET last_seen = ? WHERE id = ?",
(datetime.now(timezone.utc).isoformat(), row["id"]),
)
return dict(row)
uid = str(uuid.uuid4())
conn.execute(
"INSERT INTO users (id, username, display_name, last_seen) VALUES (?,?,?,?)",
(uid, username, display_name or username,
datetime.now(timezone.utc).isoformat()),
)
return {
"id": uid, "username": username,
"display_name": display_name or username, "role": "member",
}
# ── TIER 0 ─────────────────────────────────────────────────────────────────────
def get_identity_profile(user_id: str) -> Optional[dict]:
with db() as conn:
row = conn.execute(
"SELECT * FROM identity_profile WHERE user_id = ?", (user_id,)
).fetchone()
return dict(row) if row else None
def upsert_identity_profile(
user_id: str,
role: str = None,
preferences: str = None,
pinned_facts: str = None,
) -> dict:
now = datetime.now(timezone.utc).isoformat()
with db() as conn:
existing = conn.execute(
"SELECT id FROM identity_profile WHERE user_id = ?", (user_id,)
).fetchone()
if existing:
conn.execute(
"""UPDATE identity_profile
SET role=COALESCE(?,role),
preferences=COALESCE(?,preferences),
pinned_facts=COALESCE(?,pinned_facts),
updated_at=?
WHERE user_id=?""",
(role, preferences, pinned_facts, now, user_id),
)
else:
conn.execute(
"""INSERT INTO identity_profile
(id, user_id, role, preferences, pinned_facts, updated_at)
VALUES (?,?,?,?,?,?)""",
(user_id, user_id, role, preferences, pinned_facts, now),
)
row = conn.execute(
"SELECT * FROM identity_profile WHERE user_id=?", (user_id,)
).fetchone()
return dict(row)
# ── TIER 1 ─────────────────────────────────────────────────────────────────────
def create_session(user_id: str) -> str:
session_id = str(uuid.uuid4())
with db() as conn:
conn.execute(
"""INSERT INTO sessions (id, user_id, started_at, one_liner)
VALUES (?, ?, ?, '[session in progress]')""",
(session_id, user_id, datetime.now(timezone.utc).isoformat()),
)
return session_id
def close_session(
session_id: str,
one_liner: str,
topics: str = None,
outcome: str = None,
importance: int = 5,
) -> None:
with db() as conn:
conn.execute(
"""UPDATE sessions
SET ended_at=?, one_liner=?, topics=?, outcome=?, importance=?,
current_focus=NULL, focus_files=NULL, focus_updated_at=NULL
WHERE id=?""",
(
datetime.now(timezone.utc).isoformat(),
one_liner[:120],
topics,
outcome,
importance,
session_id,
),
)
def save_session_summary(
session_id: str,
summary: str,
key_facts: str = None,
code_refs: str = None,
) -> None:
with db() as conn:
existing = conn.execute(
"SELECT id FROM session_summaries WHERE id=?", (session_id,)
).fetchone()
if existing:
conn.execute(
"""UPDATE session_summaries
SET summary=?, key_facts=?, code_refs=? WHERE id=?""",
(summary, key_facts, code_refs, session_id),
)
else:
conn.execute(
"""INSERT INTO session_summaries (id, summary, key_facts, code_refs)
VALUES (?,?,?,?)""",
(session_id, summary, key_facts, code_refs),
)
conn.execute(
"UPDATE sessions SET has_tier2=1 WHERE id=?", (session_id,)
)
def get_recent_sessions(user_id: str, limit: int = 10) -> list:
with db() as conn:
rows = conn.execute(
"""SELECT id, started_at, ended_at, one_liner, topics,
outcome, importance, has_tier2
FROM sessions
WHERE user_id=? AND ended_at IS NOT NULL
ORDER BY started_at DESC LIMIT ?""",
(user_id, limit),
).fetchall()
return [dict(r) for r in rows]
def get_session_detail(session_id: str) -> Optional[dict]:
with db() as conn:
row = conn.execute(
"SELECT * FROM session_summaries WHERE id=?", (session_id,)
).fetchone()
return dict(row) if row else None
def get_open_sessions(user_id: str) -> list:
with db() as conn:
rows = conn.execute(
"SELECT * FROM sessions WHERE user_id=? AND ended_at IS NULL",
(user_id,),
).fetchall()
return [dict(r) for r in rows]
def announce_focus(
session_id: str,
description: str,
files: list = None,
ide_hint: str = None,
) -> dict:
"""Atomically update this session's focus and check for conflicts with other open sessions.
Uses BEGIN IMMEDIATE to make the conflict-check + write atomic — eliminates the
TOCTOU race condition where two sessions could both pass the conflict check before
either writes. Returns a dict with 'conflicts' (list of colliding sessions) and
'updated' (bool).
"""
import json
files = files or []
files_json = json.dumps(files)
now = datetime.now(timezone.utc).isoformat()
conflicts = []
conn = None
try:
from bigmind.db import get_connection
conn = get_connection()
# BEGIN IMMEDIATE acquires the write lock before we read other sessions —
# no other writer can sneak in between our check and our update.
conn.execute("BEGIN IMMEDIATE")
# Find other open sessions that share any of our files
if files:
other_sessions = conn.execute(
"""SELECT id, current_focus, focus_files, ide_hint, focus_updated_at
FROM sessions
WHERE user_id = (SELECT user_id FROM sessions WHERE id=?)
AND ended_at IS NULL
AND id != ?
AND focus_files IS NOT NULL""",
(session_id, session_id),
).fetchall()
for row in other_sessions:
try:
other_files = json.loads(row["focus_files"] or "[]")
except (json.JSONDecodeError, TypeError):
other_files = []
overlapping = [f for f in files if f in other_files]
if overlapping:
conflicts.append({
"session_id": row["id"][:8],
"ide_hint": row["ide_hint"],
"focus": row["current_focus"],
"overlapping_files": overlapping,
"focus_updated_at": row["focus_updated_at"],
})
# Write our focus atomically — under the same lock as the check above
update_fields: list = [description, files_json, now]
if ide_hint is not None:
conn.execute(
"""UPDATE sessions
SET current_focus=?, focus_files=?, focus_updated_at=?, ide_hint=?
WHERE id=?""",
(description, files_json, now, ide_hint, session_id),
)
else:
conn.execute(
"""UPDATE sessions
SET current_focus=?, focus_files=?, focus_updated_at=?
WHERE id=?""",
(description, files_json, now, session_id),
)
conn.commit()
except Exception:
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
return {"updated": True, "conflicts": conflicts}
def get_active_sessions(user_id: str) -> list:
"""Return all open sessions with their focus data and idle_minutes computed."""
import json
now = datetime.now(timezone.utc)
with db() as conn:
rows = conn.execute(
"""SELECT id, started_at, current_focus, focus_files,
focus_updated_at, ide_hint
FROM sessions
WHERE user_id=? AND ended_at IS NULL
ORDER BY COALESCE(focus_updated_at, started_at) DESC""",
(user_id,),
).fetchall()
result = []
for row in rows:
r = dict(row)
# Compute idle_minutes from focus_updated_at (or started_at as fallback)
ts_str = r.get("focus_updated_at") or r.get("started_at")
idle_minutes = None
if ts_str:
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
idle_minutes = int((now - ts).total_seconds() / 60)
except (ValueError, TypeError):
idle_minutes = None
try:
files = json.loads(r.get("focus_files") or "[]")
except (json.JSONDecodeError, TypeError):
files = []
result.append({
"session_id": r["id"],
"ide_hint": r.get("ide_hint"),
"focus": r.get("current_focus"),
"files": files,
"focus_updated_at": r.get("focus_updated_at"),
"idle_minutes": idle_minutes,
})
return result
# ── TOKEN SAVES ────────────────────────────────────────────────────────────────
def log_token_save(
session_id: str,
user_id: str,
description: str,
tokens_saved_estimate: int,
method_used: str = None,
) -> int:
"""Record a token efficiency event in the token_saves table. Returns the new row id."""
with db() as conn:
cur = conn.execute(
"""INSERT INTO token_saves (session_id, user_id, description, tokens_saved_estimate, method_used)
VALUES (?,?,?,?,?)""",
(session_id, user_id, description, tokens_saved_estimate, method_used),
)
return cur.lastrowid
def get_token_efficiency_stats(user_id: str, session_id: str = None) -> dict:
"""Return aggregated token efficiency stats for profile display."""
with db() as conn:
total = conn.execute(
"SELECT COALESCE(SUM(tokens_saved_estimate),0) FROM token_saves WHERE user_id=?",
(user_id,),
).fetchone()[0]
session_total = 0
if session_id:
session_total = conn.execute(
"SELECT COALESCE(SUM(tokens_saved_estimate),0) FROM token_saves WHERE user_id=? AND session_id=?",
(user_id, session_id),
).fetchone()[0]
best_row = conn.execute(
"""SELECT description, tokens_saved_estimate, method_used, created_at
FROM token_saves WHERE user_id=?
ORDER BY tokens_saved_estimate DESC LIMIT 1""",
(user_id,),
).fetchone()
by_method = conn.execute(
"""SELECT method_used, SUM(tokens_saved_estimate) as total
FROM token_saves WHERE user_id=?
GROUP BY method_used ORDER BY total DESC""",
(user_id,),
).fetchall()
recent = conn.execute(
"""SELECT description, tokens_saved_estimate, method_used, created_at
FROM token_saves WHERE user_id=?
ORDER BY created_at DESC LIMIT 5""",
(user_id,),
).fetchall()
return {
"total_tokens_saved": total,
"session_tokens_saved": session_total,
"best_save": dict(best_row) if best_row else None,
"by_method": [dict(r) for r in by_method],
"recent_saves": [dict(r) for r in recent],
}
# ── TIER 3 ─────────────────────────────────────────────────────────────────────
def append_chunk(
session_id: str,
user_id: str,
role: str,
content: str,
flag_reason: str = None,
) -> int:
with db() as conn:
max_seq = conn.execute(
"SELECT COALESCE(MAX(seq),0) FROM conversation_chunks WHERE session_id=?",
(session_id,),
).fetchone()[0]
seq = max_seq + 1
cur = conn.execute(
"""INSERT INTO conversation_chunks
(session_id, user_id, role, content, flag_reason, seq)
VALUES (?,?,?,?,?,?)""",
(session_id, user_id, role, content, flag_reason, seq),
)
chunk_id = cur.lastrowid
# Keep FTS in sync — rowid of FTS row = chunk_id
conn.execute(
"INSERT INTO conversation_chunks_fts(rowid, content, flag_reason) VALUES(?,?,?)",
(chunk_id, content, flag_reason or ""),
)
return chunk_id
def _fts_safe_query(query: str) -> str:
"""Wrap each token in double-quotes for safe FTS5 matching.
Prevents FTS5 reserved-word collisions (rank, content, category, etc.)
while correctly AND-matching multi-word queries — NOT phrase matching.
FTS5 semantics:
"word1" "word2" → documents containing BOTH words anywhere (AND match ✅)
"word1 word2" → documents where word1 appears directly before word2 (phrase ❌)
Bug history: the 2026-03-31 fix used f'"{query}"' which wraps the entire string,
accidentally turning every multi-word query into a phrase search that almost never
matches. This helper fixes that by quoting each token independently.
"""
tokens = [t.strip('"\'') for t in query.split() if t.strip()]
return ' '.join(f'"{t}"' for t in tokens if t)
def search_chunks(user_id: str, query: str, limit: int = 10) -> list:
with db() as conn:
rows = conn.execute(
"""SELECT cc.id, cc.session_id, cc.role, cc.content,
cc.flag_reason, cc.created_at,
bm25(conversation_chunks_fts) AS rank
FROM conversation_chunks_fts
JOIN conversation_chunks cc ON cc.id = conversation_chunks_fts.rowid
WHERE conversation_chunks_fts MATCH ?
AND cc.user_id = ?
ORDER BY rank
LIMIT ?""",
(_fts_safe_query(query), user_id, limit),
).fetchall()
return [dict(r) for r in rows]
def delete_chunks_before(user_id: str, cutoff_iso: str) -> int:
"""Delete Tier-3 chunks older than cutoff. Returns count deleted."""
with db() as conn:
count = conn.execute(
"SELECT COUNT(*) FROM conversation_chunks WHERE user_id=? AND created_at < ?",
(user_id, cutoff_iso),
).fetchone()[0]
if count == 0:
return 0
conn.execute(
"DELETE FROM conversation_chunks WHERE user_id=? AND created_at < ?",
(user_id, cutoff_iso),
)
# Rebuild the FTS5 index from the content table — always correct for content= tables
conn.execute(
"INSERT INTO conversation_chunks_fts(conversation_chunks_fts) VALUES('rebuild')"
)
return count
# ── FACTS ───────────────────────────────────────────────────────────────────────
def store_fact(
user_id: str,
category: str,
fact: str,
source_session: str = None,
confidence: float = 1.0,
) -> int:
with db() as conn:
cur = conn.execute(
"""INSERT INTO facts (user_id, category, fact, source_session, confidence)
VALUES (?,?,?,?,?)""",
(user_id, category, fact, source_session, confidence),
)
fact_id = cur.lastrowid
conn.execute(
"INSERT INTO facts_fts(rowid, fact, category) VALUES (?,?,?)",
(fact_id, fact, category),
)
return fact_id
def get_facts(user_id: str, category: str = None, include_deprecated: bool = False) -> list:
with db() as conn:
clauses = ["user_id=?"]
params: list = [user_id]
if category:
clauses.append("category=?")
params.append(category)
if not include_deprecated:
clauses.append("(deprecated IS NULL OR deprecated=0)")
where = " AND ".join(clauses)
rows = conn.execute(
f"SELECT * FROM facts WHERE {where} ORDER BY created_at DESC",
params,
).fetchall()
return [dict(r) for r in rows]
def deprecate_fact(fact_id: int, user_id: str, reason: str = None) -> bool:
"""Mark a fact as deprecated. Returns True if a row was updated."""
now = datetime.now(timezone.utc).isoformat()
with db() as conn:
row = conn.execute(
"SELECT id FROM facts WHERE id=? AND user_id=?", (fact_id, user_id)
).fetchone()
if not row:
return False
conn.execute(
"""UPDATE facts
SET deprecated=1, deprecation_reason=?, updated_at=?
WHERE id=?""",
(reason, now, fact_id),
)
# Remove from FTS so deprecated facts don't appear in search results
conn.execute("DELETE FROM facts_fts WHERE rowid=?", (fact_id,))
return True
def search_facts(user_id: str, query: str, limit: int = 10) -> list:
"""Full-text search across non-deprecated facts for a user."""
with db() as conn:
rows = conn.execute(
"""SELECT f.id, f.category, f.fact, f.confidence, f.created_at,
bm25(facts_fts) AS rank
FROM facts_fts
JOIN facts f ON f.id = facts_fts.rowid
WHERE facts_fts MATCH ?
AND f.user_id = ?
AND (f.deprecated IS NULL OR f.deprecated = 0)
ORDER BY rank
LIMIT ?""",
(_fts_safe_query(query), user_id, limit),
).fetchall()
return [dict(r) for r in rows]
# ── THOUGHT JOURNAL ──────────────────────────────────────────────────────────────
def add_hypothesis(
user_id: str,
session_id: str,
hypothesis: str,
confidence: float = 0.7,
) -> int:
"""Record a new hypothesis. Returns the hypothesis id."""
with db() as conn:
cur = conn.execute(
"""INSERT INTO hypotheses (user_id, session_id, hypothesis, confidence)
VALUES (?, ?, ?, ?)""",
(user_id, session_id, hypothesis, confidence),
)
return cur.lastrowid
def resolve_hypothesis(
hypothesis_id: int,
user_id: str,
status: str,
resolution: str = None,
) -> bool:
"""Resolve a hypothesis. Returns True if updated, False if not found / wrong user."""
if status not in ("confirmed", "refuted", "abandoned"):
raise ValueError(f"Invalid status '{status}'. Must be confirmed, refuted, or abandoned.")
now = datetime.now(timezone.utc).isoformat()
with db() as conn:
row = conn.execute(
"SELECT id FROM hypotheses WHERE id=? AND user_id=?",
(hypothesis_id, user_id),
).fetchone()
if not row:
return False
conn.execute(
"""UPDATE hypotheses
SET status=?, resolution=?, resolved_at=?
WHERE id=?""",
(status, resolution, now, hypothesis_id),
)
return True
def list_hypotheses(user_id: str, status: str = None) -> list:
"""Return hypotheses for a user, optionally filtered by status."""
with db() as conn:
if status:
rows = conn.execute(
"""SELECT * FROM hypotheses WHERE user_id=? AND status=?
ORDER BY created_at DESC""",
(user_id, status),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM hypotheses WHERE user_id=? ORDER BY created_at DESC",
(user_id,),
).fetchall()
return [dict(r) for r in rows]
# ── UPGRADE REQUESTS ────────────────────────────────────────────────────────────────────────────────────
def add_upgrade_request(
user_id: str,
session_id: str,
description: str,
reason: str,
priority: str = "medium",
certainty: float = 0.7,
) -> int:
"""Record a new upgrade request. Returns the request id."""
with db() as conn:
cur = conn.execute(
"""INSERT INTO upgrade_requests
(user_id, session_id, description, reason, priority, certainty)
VALUES (?, ?, ?, ?, ?, ?)""",
(user_id, session_id, description, reason, priority, certainty),
)
return cur.lastrowid
def list_upgrade_requests(user_id: str, status: str = None) -> list:
"""Return upgrade requests for a user, optionally filtered by status."""
with db() as conn:
if status:
rows = conn.execute(
"""SELECT * FROM upgrade_requests WHERE user_id=? AND status=?
ORDER BY created_at DESC""",
(user_id, status),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM upgrade_requests WHERE user_id=? ORDER BY created_at DESC",
(user_id,),
).fetchall()
return [dict(r) for r in rows]
def resolve_upgrade_request(
request_id: int,
user_id: str,
status: str,
resolution: str = None,
) -> bool:
"""Resolve an upgrade request. Returns True if updated, False if not found / wrong user."""
if status not in ("resolved", "rejected"):
raise ValueError(f"Invalid status '{status}'. Must be resolved or rejected.")
now = datetime.now(timezone.utc).isoformat()
with db() as conn:
row = conn.execute(
"SELECT id FROM upgrade_requests WHERE id=? AND user_id=?",
(request_id, user_id),
).fetchone()
if not row:
return False
conn.execute(
"""UPDATE upgrade_requests
SET status=?, resolution=?, resolved_at=?
WHERE id=?""",
(status, resolution, now, request_id),
)
return True
# ── HEALTH CHECK ────────────────────────────────────────────────────────────────
def health_check(user_id: str, stale_days: int = 30) -> dict:
"""Diagnostic health check on BigMind memory for a user."""
from datetime import timedelta
cutoff = (datetime.now(timezone.utc) - timedelta(days=stale_days)).isoformat()
with db() as conn:
# Facts not updated since the cutoff
stale_rows = conn.execute(
"""SELECT id, category, fact, updated_at, confidence
FROM facts WHERE user_id=? AND updated_at < ?
ORDER BY updated_at""",
(user_id, cutoff),
).fetchall()
# Closed sessions with no Tier-2 narrative
sessions_no_summary = conn.execute(
"""SELECT COUNT(*) FROM sessions
WHERE user_id=? AND ended_at IS NOT NULL AND has_tier2=0""",
(user_id,),
).fetchone()[0]
# Sessions still open (ended_at IS NULL)
open_rows = conn.execute(
"SELECT id, started_at FROM sessions WHERE user_id=? AND ended_at IS NULL",
(user_id,),
).fetchall()
# FTS integrity: global count (FTS rowid = chunk id, no user_id column)
chunk_count = conn.execute(
"SELECT COUNT(*) FROM conversation_chunks"
).fetchone()[0]
fts_count = conn.execute(
"SELECT COUNT(*) FROM conversation_chunks_fts"
).fetchone()[0]
# Low confidence facts (< 0.8)
low_conf_rows = conn.execute(
"""SELECT id, category, fact, confidence
FROM facts WHERE user_id=? AND confidence < 0.8
ORDER BY confidence""",
(user_id,),
).fetchall()
return {
"stale_facts": [dict(r) for r in stale_rows],
"sessions_without_summary": sessions_no_summary,
"open_sessions": [dict(r) for r in open_rows],
"chunk_count": chunk_count,
"fts_row_count": fts_count,
"fts_in_sync": chunk_count == fts_count,
"low_confidence_facts": [dict(r) for r in low_conf_rows],
"stale_threshold_days": stale_days,
}
# ── EXPORT ───────────────────────────────────────────────────────────────────────
def export_memory(user_id: str, output_path: str = None) -> dict:
"""Export all memory for a user to a portable JSON file.
Exports ALL tables: facts, sessions (with Tier-2), conversation chunks,
people/contacts, hypotheses, token saves, and upgrade requests.
"""
import json
from pathlib import Path
from bigmind.db import SCHEMA_VERSION
if not output_path:
date_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
output_path = str(Path.home() / f"bigmind_export_{date_str}.json")
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
with db() as conn:
user_row = conn.execute(
"SELECT id, username, display_name, role, created_at, last_seen FROM users WHERE id=?",
(user_id,),
).fetchone()
user_info = dict(user_row) if user_row else {}
profile_row = conn.execute(
"SELECT * FROM identity_profile WHERE user_id=?", (user_id,)
).fetchone()
profile = dict(profile_row) if profile_row else {}
facts = [
dict(r) for r in conn.execute(
"SELECT * FROM facts WHERE user_id=? ORDER BY created_at", (user_id,)
).fetchall()
]
sessions = []
for s in conn.execute(
"SELECT * FROM sessions WHERE user_id=? ORDER BY started_at", (user_id,)
).fetchall():
sd = dict(s)
summary_row = conn.execute(
"SELECT * FROM session_summaries WHERE id=?", (s["id"],)
).fetchone()
sd["tier2_summary"] = dict(summary_row) if summary_row else None
sessions.append(sd)
chunks = [
dict(r) for r in conn.execute(
"SELECT * FROM conversation_chunks WHERE user_id=? ORDER BY created_at, seq",
(user_id,),
).fetchall()
]
# ── v3+ tables ───────────────────────────────────────────────────────
hypotheses = [
dict(r) for r in conn.execute(
"SELECT * FROM hypotheses WHERE user_id=? ORDER BY created_at",
(user_id,),
).fetchall()
]
upgrade_requests = [
dict(r) for r in conn.execute(
"SELECT * FROM upgrade_requests WHERE user_id=? ORDER BY created_at",
(user_id,),
).fetchall()
]
# ── v6+ tables ───────────────────────────────────────────────────────
token_saves = [
dict(r) for r in conn.execute(
"SELECT * FROM token_saves WHERE user_id=? ORDER BY created_at",
(user_id,),
).fetchall()
]
# ── v7+ tables ───────────────────────────────────────────────────────
people = [
dict(r) for r in conn.execute(
"SELECT * FROM people WHERE user_id=? ORDER BY created_at",
(user_id,),
).fetchall()
]
export_data = {
"export_date": datetime.now(timezone.utc).isoformat(),
"bigmind_schema_version": SCHEMA_VERSION,
"user": user_info,
"identity_profile": profile,
"facts": facts,
"sessions": sessions,
"conversation_chunks": chunks,
"hypotheses": hypotheses,
"upgrade_requests": upgrade_requests,
"token_saves": token_saves,
"people": people,
"stats": {
"facts_count": len(facts),
"sessions_count": len(sessions),
"chunks_count": len(chunks),
"hypotheses_count": len(hypotheses),
"upgrade_requests_count": len(upgrade_requests),
"token_saves_count": len(token_saves),
"people_count": len(people),
},
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(export_data, f, indent=2, default=str)
return {
"output_path": str(output_path),
"facts_count": len(facts),
"sessions_count": len(sessions),
"chunks_count": len(chunks),
"hypotheses_count": len(hypotheses),
"people_count": len(people),
"file_size_kb": round(output.stat().st_size / 1024, 1),
}
# ── STATS ───────────────────────────────────────────────────────────────────────
def get_stats(user_id: str) -> dict:
db_path = get_db_path()
with db() as conn:
sessions = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE user_id=?", (user_id,)
).fetchone()[0]
facts = conn.execute(
"SELECT COUNT(*) FROM facts WHERE user_id=?", (user_id,)
).fetchone()[0]
chunks = conn.execute(
"SELECT COUNT(*) FROM conversation_chunks WHERE user_id=?", (user_id,)
).fetchone()[0]
global_cnt = conn.execute(
"SELECT COUNT(*) FROM global_knowledge WHERE status='approved'"
).fetchone()[0]
size = db_path.stat().st_size if db_path.exists() else 0
return {
"sessions": sessions,
"facts": facts,
"chunks": chunks,
"global_knowledge_entries": global_cnt,
"db_size_bytes": size,
"db_size_kb": round(size / 1024, 1),
"db_path": str(db_path),
}
# ── PEOPLE / CONTACTS ────────────────────────────────────────────────────────
def upsert_person(
user_id: str,
username: str,
display_name: str = None,
role: str = None,
team: str = None,
notes: str = None,
bigmind_user: str = None,
bigmind_url: str = None,
) -> int:
"""Insert or update a person in the contacts directory. Returns the row id."""
now = datetime.now(timezone.utc).isoformat()
with db() as conn:
existing = conn.execute(
"SELECT id FROM people WHERE user_id=? AND username=?",
(user_id, username),
).fetchone()
if existing:
person_id = existing["id"]
# Build dynamic UPDATE — only overwrite non-None fields
updates = {"last_mentioned_at": now}
for field, val in [
("display_name", display_name), ("role", role), ("team", team),
("notes", notes), ("bigmind_user", bigmind_user), ("bigmind_url", bigmind_url),
]:
if val is not None:
updates[field] = val
set_clause = ", ".join(f"{k}=?" for k in updates)
conn.execute(
f"UPDATE people SET {set_clause} WHERE id=?",
(*updates.values(), person_id),
)
# Refresh FTS
conn.execute("DELETE FROM people_fts WHERE rowid=?", (person_id,))
row = conn.execute("SELECT * FROM people WHERE id=?", (person_id,)).fetchone()
else:
cur = conn.execute(
"""INSERT INTO people
(user_id, username, display_name, role, team, notes,
bigmind_user, bigmind_url, last_mentioned_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(user_id, username, display_name, role, team, notes,
bigmind_user, bigmind_url, now),
)
person_id = cur.lastrowid
row = conn.execute("SELECT * FROM people WHERE id=?", (person_id,)).fetchone()
conn.execute(
"INSERT INTO people_fts(rowid, username, display_name, role, team, notes) "
"VALUES (?,?,?,?,?,?)",
(person_id, row["username"], row["display_name"] or "",
row["role"] or "", row["team"] or "", row["notes"] or ""),
)
return person_id
def recall_person(user_id: str, query: str, limit: int = 10) -> list:
"""Full-text search across the people directory."""
with db() as conn:
rows = conn.execute(
"""SELECT p.*, bm25(people_fts) AS rank
FROM people_fts
JOIN people p ON p.id = people_fts.rowid
WHERE people_fts MATCH ?
AND p.user_id = ?
ORDER BY rank
LIMIT ?""",
(_fts_safe_query(query), user_id, limit),
).fetchall()
return [dict(r) for r in rows]
def list_people(user_id: str) -> list:
"""Return all contacts for a user, ordered by last_mentioned_at."""
with db() as conn:
rows = conn.execute(
"SELECT * FROM people WHERE user_id=? ORDER BY last_mentioned_at DESC",
(user_id,),
).fetchall()
return [dict(r) for r in rows]
def link_ai(user_id: str, username: str, bigmind_user: str, bigmind_url: str = None) -> bool:
"""Link a contact to their BigMind AI instance. Returns True if the person was found."""
now = datetime.now(timezone.utc).isoformat()
with db() as conn:
row = conn.execute(
"SELECT id FROM people WHERE user_id=? AND username=?",
(user_id, username),
).fetchone()
if not row:
return False
conn.execute(
"UPDATE people SET bigmind_user=?, bigmind_url=?, last_mentioned_at=? WHERE id=?",
(bigmind_user, bigmind_url, now, row["id"]),
)
return True