|
|
""" |
|
|
src/storage/sqlite_cache.py |
|
|
Fast hash-based cache for first-tier deduplication |
|
|
""" |
|
|
|
|
|
import sqlite3 |
|
|
import hashlib |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, Tuple |
|
|
from .config import config |
|
|
|
|
|
logger = logging.getLogger("sqlite_cache") |
|
|
|
|
|
|
|
|
class SQLiteCache: |
|
|
""" |
|
|
Fast hash-based cache for exact match deduplication. |
|
|
Uses MD5 hash of first N characters for O(1) lookup. |
|
|
""" |
|
|
|
|
|
def __init__(self, db_path: Optional[str] = None): |
|
|
self.db_path = db_path or config.SQLITE_DB_PATH |
|
|
self._init_db() |
|
|
logger.info(f"[SQLiteCache] Initialized at {self.db_path}") |
|
|
|
|
|
def _init_db(self): |
|
|
"""Initialize database schema""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
conn.execute( |
|
|
""" |
|
|
CREATE TABLE IF NOT EXISTS seen_hashes ( |
|
|
content_hash TEXT PRIMARY KEY, |
|
|
first_seen TIMESTAMP NOT NULL, |
|
|
last_seen TIMESTAMP NOT NULL, |
|
|
event_id TEXT, |
|
|
summary_preview TEXT |
|
|
) |
|
|
""" |
|
|
) |
|
|
conn.execute( |
|
|
"CREATE INDEX IF NOT EXISTS idx_last_seen ON seen_hashes(last_seen)" |
|
|
) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def _get_hash(self, summary: str) -> str: |
|
|
"""Generate MD5 hash from first N characters""" |
|
|
normalized = summary[: config.EXACT_MATCH_CHARS].strip().lower() |
|
|
return hashlib.md5(normalized.encode("utf-8")).hexdigest() |
|
|
|
|
|
def has_exact_match( |
|
|
self, summary: str, retention_hours: Optional[int] = None |
|
|
) -> Tuple[bool, Optional[str]]: |
|
|
""" |
|
|
Check if summary exists in cache (exact match). |
|
|
|
|
|
Returns: |
|
|
(is_duplicate, event_id) |
|
|
""" |
|
|
if not summary: |
|
|
return False, None |
|
|
|
|
|
retention_hours = retention_hours or config.SQLITE_RETENTION_HOURS |
|
|
content_hash = self._get_hash(summary) |
|
|
cutoff = datetime.utcnow() - timedelta(hours=retention_hours) |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.execute( |
|
|
"SELECT event_id FROM seen_hashes WHERE content_hash = ? AND last_seen > ?", |
|
|
(content_hash, cutoff.isoformat()), |
|
|
) |
|
|
result = cursor.fetchone() |
|
|
conn.close() |
|
|
|
|
|
if result: |
|
|
logger.debug(f"[SQLiteCache] EXACT MATCH found: {content_hash[:8]}...") |
|
|
return True, result[0] |
|
|
|
|
|
return False, None |
|
|
|
|
|
def add_entry(self, summary: str, event_id: str): |
|
|
"""Add new entry to cache or update existing""" |
|
|
if not summary: |
|
|
return |
|
|
|
|
|
content_hash = self._get_hash(summary) |
|
|
now = datetime.utcnow().isoformat() |
|
|
preview = summary[:2000] |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
|
|
|
|
|
|
cursor = conn.execute( |
|
|
"UPDATE seen_hashes SET last_seen = ? WHERE content_hash = ?", |
|
|
(now, content_hash), |
|
|
) |
|
|
|
|
|
|
|
|
if cursor.rowcount == 0: |
|
|
conn.execute( |
|
|
"INSERT INTO seen_hashes VALUES (?, ?, ?, ?, ?)", |
|
|
(content_hash, now, now, event_id, preview), |
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
logger.debug(f"[SQLiteCache] Added: {content_hash[:8]}... ({event_id})") |
|
|
|
|
|
def cleanup_old_entries(self, retention_hours: Optional[int] = None): |
|
|
"""Remove entries older than retention period""" |
|
|
retention_hours = retention_hours or config.SQLITE_RETENTION_HOURS |
|
|
cutoff = datetime.utcnow() - timedelta(hours=retention_hours) |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.execute( |
|
|
"DELETE FROM seen_hashes WHERE last_seen < ?", (cutoff.isoformat(),) |
|
|
) |
|
|
deleted = cursor.rowcount |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
if deleted > 0: |
|
|
logger.info(f"[SQLiteCache] Cleaned up {deleted} old entries") |
|
|
|
|
|
return deleted |
|
|
|
|
|
def get_all_entries(self, limit: int = 100, offset: int = 0) -> list: |
|
|
""" |
|
|
Paginated retrieval of all cached entries. |
|
|
Returns list of dicts with event metadata. |
|
|
""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.execute( |
|
|
"SELECT content_hash, first_seen, last_seen, event_id, summary_preview FROM seen_hashes ORDER BY last_seen DESC LIMIT ? OFFSET ?", |
|
|
(limit, offset), |
|
|
) |
|
|
|
|
|
results = [] |
|
|
for row in cursor.fetchall(): |
|
|
results.append( |
|
|
{ |
|
|
"content_hash": row[0], |
|
|
"first_seen": row[1], |
|
|
"last_seen": row[2], |
|
|
"event_id": row[3], |
|
|
"summary_preview": row[4], |
|
|
} |
|
|
) |
|
|
|
|
|
conn.close() |
|
|
return results |
|
|
|
|
|
def search_entries(self, query: str, limit: int = 10) -> list: |
|
|
""" |
|
|
Search for entries containing specific text. |
|
|
Args: |
|
|
query: Text to search for (case-insensitive LIKE) |
|
|
limit: Max results |
|
|
""" |
|
|
if not query or len(query) < 2: |
|
|
return [] |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.execute( |
|
|
"SELECT content_hash, first_seen, last_seen, event_id, summary_preview FROM seen_hashes WHERE summary_preview LIKE ? ORDER BY last_seen DESC LIMIT ?", |
|
|
(f"%{query}%", limit), |
|
|
) |
|
|
|
|
|
results = [] |
|
|
for row in cursor.fetchall(): |
|
|
results.append( |
|
|
{ |
|
|
"content_hash": row[0], |
|
|
"first_seen": row[1], |
|
|
"last_seen": row[2], |
|
|
"event_id": row[3], |
|
|
"summary_preview": row[4], |
|
|
} |
|
|
) |
|
|
|
|
|
conn.close() |
|
|
return results |
|
|
|
|
|
def get_entries_since(self, timestamp: str) -> list: |
|
|
""" |
|
|
Get entries added/updated after timestamp. |
|
|
|
|
|
Args: |
|
|
timestamp: ISO format timestamp string |
|
|
|
|
|
Returns: |
|
|
List of entry dicts |
|
|
""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.execute( |
|
|
"SELECT content_hash, first_seen, last_seen, event_id, summary_preview FROM seen_hashes WHERE last_seen > ? ORDER BY last_seen DESC", |
|
|
(timestamp,), |
|
|
) |
|
|
|
|
|
results = [] |
|
|
for row in cursor.fetchall(): |
|
|
results.append( |
|
|
{ |
|
|
"content_hash": row[0], |
|
|
"first_seen": row[1], |
|
|
"last_seen": row[2], |
|
|
"event_id": row[3], |
|
|
"summary_preview": row[4], |
|
|
} |
|
|
) |
|
|
|
|
|
conn.close() |
|
|
return results |
|
|
|
|
|
def get_stats(self) -> dict: |
|
|
"""Get cache statistics""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
|
|
|
cursor = conn.execute("SELECT COUNT(*) FROM seen_hashes") |
|
|
total = cursor.fetchone()[0] |
|
|
|
|
|
cutoff_24h = datetime.utcnow() - timedelta(hours=24) |
|
|
cursor = conn.execute( |
|
|
"SELECT COUNT(*) FROM seen_hashes WHERE last_seen > ?", |
|
|
(cutoff_24h.isoformat(),), |
|
|
) |
|
|
last_24h = cursor.fetchone()[0] |
|
|
|
|
|
conn.close() |
|
|
|
|
|
return { |
|
|
"total_entries": total, |
|
|
"entries_last_24h": last_24h, |
|
|
"db_path": self.db_path, |
|
|
} |
|
|
|