modelx / src /storage /sqlite_cache.py
nivakaran's picture
Deploy from GitHub Actions
ff3017c verified
"""
src/storage/sqlite_cache.py
Fast hash-based cache for first-tier deduplication
"""
import sqlite3
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Optional, Tuple
from .config import config
logger = logging.getLogger("sqlite_cache")
class SQLiteCache:
"""
Fast hash-based cache for exact match deduplication.
Uses MD5 hash of first N characters for O(1) lookup.
"""
def __init__(self, db_path: Optional[str] = None):
self.db_path = db_path or config.SQLITE_DB_PATH
self._init_db()
logger.info(f"[SQLiteCache] Initialized at {self.db_path}")
def _init_db(self):
"""Initialize database schema"""
conn = sqlite3.connect(self.db_path)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS seen_hashes (
content_hash TEXT PRIMARY KEY,
first_seen TIMESTAMP NOT NULL,
last_seen TIMESTAMP NOT NULL,
event_id TEXT,
summary_preview TEXT
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_last_seen ON seen_hashes(last_seen)"
)
conn.commit()
conn.close()
def _get_hash(self, summary: str) -> str:
"""Generate MD5 hash from first N characters"""
normalized = summary[: config.EXACT_MATCH_CHARS].strip().lower()
return hashlib.md5(normalized.encode("utf-8")).hexdigest()
def has_exact_match(
self, summary: str, retention_hours: Optional[int] = None
) -> Tuple[bool, Optional[str]]:
"""
Check if summary exists in cache (exact match).
Returns:
(is_duplicate, event_id)
"""
if not summary:
return False, None
retention_hours = retention_hours or config.SQLITE_RETENTION_HOURS
content_hash = self._get_hash(summary)
cutoff = datetime.utcnow() - timedelta(hours=retention_hours)
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT event_id FROM seen_hashes WHERE content_hash = ? AND last_seen > ?",
(content_hash, cutoff.isoformat()),
)
result = cursor.fetchone()
conn.close()
if result:
logger.debug(f"[SQLiteCache] EXACT MATCH found: {content_hash[:8]}...")
return True, result[0]
return False, None
def add_entry(self, summary: str, event_id: str):
"""Add new entry to cache or update existing"""
if not summary:
return
content_hash = self._get_hash(summary)
now = datetime.utcnow().isoformat()
preview = summary[:2000] # Store full summary (was 200)
conn = sqlite3.connect(self.db_path)
# Try update first
cursor = conn.execute(
"UPDATE seen_hashes SET last_seen = ? WHERE content_hash = ?",
(now, content_hash),
)
# If no rows updated, insert new
if cursor.rowcount == 0:
conn.execute(
"INSERT INTO seen_hashes VALUES (?, ?, ?, ?, ?)",
(content_hash, now, now, event_id, preview),
)
conn.commit()
conn.close()
logger.debug(f"[SQLiteCache] Added: {content_hash[:8]}... ({event_id})")
def cleanup_old_entries(self, retention_hours: Optional[int] = None):
"""Remove entries older than retention period"""
retention_hours = retention_hours or config.SQLITE_RETENTION_HOURS
cutoff = datetime.utcnow() - timedelta(hours=retention_hours)
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"DELETE FROM seen_hashes WHERE last_seen < ?", (cutoff.isoformat(),)
)
deleted = cursor.rowcount
conn.commit()
conn.close()
if deleted > 0:
logger.info(f"[SQLiteCache] Cleaned up {deleted} old entries")
return deleted
def get_all_entries(self, limit: int = 100, offset: int = 0) -> list:
"""
Paginated retrieval of all cached entries.
Returns list of dicts with event metadata.
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT content_hash, first_seen, last_seen, event_id, summary_preview FROM seen_hashes ORDER BY last_seen DESC LIMIT ? OFFSET ?",
(limit, offset),
)
results = []
for row in cursor.fetchall():
results.append(
{
"content_hash": row[0],
"first_seen": row[1],
"last_seen": row[2],
"event_id": row[3],
"summary_preview": row[4],
}
)
conn.close()
return results
def search_entries(self, query: str, limit: int = 10) -> list:
"""
Search for entries containing specific text.
Args:
query: Text to search for (case-insensitive LIKE)
limit: Max results
"""
if not query or len(query) < 2:
return []
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT content_hash, first_seen, last_seen, event_id, summary_preview FROM seen_hashes WHERE summary_preview LIKE ? ORDER BY last_seen DESC LIMIT ?",
(f"%{query}%", limit),
)
results = []
for row in cursor.fetchall():
results.append(
{
"content_hash": row[0],
"first_seen": row[1],
"last_seen": row[2],
"event_id": row[3],
"summary_preview": row[4],
}
)
conn.close()
return results
def get_entries_since(self, timestamp: str) -> list:
"""
Get entries added/updated after timestamp.
Args:
timestamp: ISO format timestamp string
Returns:
List of entry dicts
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT content_hash, first_seen, last_seen, event_id, summary_preview FROM seen_hashes WHERE last_seen > ? ORDER BY last_seen DESC",
(timestamp,),
)
results = []
for row in cursor.fetchall():
results.append(
{
"content_hash": row[0],
"first_seen": row[1],
"last_seen": row[2],
"event_id": row[3],
"summary_preview": row[4],
}
)
conn.close()
return results
def get_stats(self) -> dict:
"""Get cache statistics"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute("SELECT COUNT(*) FROM seen_hashes")
total = cursor.fetchone()[0]
cutoff_24h = datetime.utcnow() - timedelta(hours=24)
cursor = conn.execute(
"SELECT COUNT(*) FROM seen_hashes WHERE last_seen > ?",
(cutoff_24h.isoformat(),),
)
last_24h = cursor.fetchone()[0]
conn.close()
return {
"total_entries": total,
"entries_last_24h": last_24h,
"db_path": self.db_path,
}