File size: 5,833 Bytes
b4856f1
 
 
 
752f5cc
b4856f1
16ec2cf
b4856f1
 
 
 
 
 
 
752f5cc
b4856f1
 
 
 
 
 
 
 
 
 
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
752f5cc
 
 
b4856f1
752f5cc
b4856f1
 
752f5cc
 
 
b4856f1
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
752f5cc
b4856f1
 
 
 
 
752f5cc
 
b4856f1
752f5cc
b4856f1
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
 
 
 
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
 
 
b4856f1
752f5cc
b4856f1
 
752f5cc
 
b4856f1
 
 
752f5cc
b4856f1
752f5cc
 
 
 
b4856f1
 
 
 
 
752f5cc
b4856f1
 
 
 
 
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
b4856f1
 
 
752f5cc
b4856f1
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
 
 
 
 
752f5cc
b4856f1
 
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
b4856f1
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
 
 
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
src/storage/chromadb_store.py
Semantic similarity search using ChromaDB with sentence transformers
"""

import logging
from typing import Dict, Any, Optional
from datetime import datetime

logger = logging.getLogger("chromadb_store")

try:
    import chromadb
    from chromadb.config import Settings

    CHROMADB_AVAILABLE = True
except ImportError:
    CHROMADB_AVAILABLE = False
    logger.warning("[ChromaDB] Package not installed. Semantic deduplication disabled.")

from .config import config


class ChromaDBStore:
    """
    Semantic similarity search for advanced deduplication.
    Uses sentence transformers to detect paraphrased/similar content.
    """

    def __init__(self):
        self.client = None
        self.collection = None

        if not CHROMADB_AVAILABLE:
            logger.warning(
                "[ChromaDB] Not available - using fallback (no semantic dedup)"
            )
            return

        try:
            self._init_client()
            logger.info(
                f"[ChromaDB] Initialized collection: {config.CHROMADB_COLLECTION}"
            )
        except Exception as e:
            logger.error(f"[ChromaDB] Initialization failed: {e}")
            self.client = None

    def _init_client(self):
        """Initialize ChromaDB client and collection"""
        self.client = chromadb.PersistentClient(
            path=config.CHROMADB_PATH,
            settings=Settings(anonymized_telemetry=False, allow_reset=True),
        )

        # Get or create collection with sentence transformer embedding
        self.collection = self.client.get_or_create_collection(
            name=config.CHROMADB_COLLECTION,
            metadata={
                "description": "Roger intelligence feed semantic deduplication",
                "embedding_model": config.CHROMADB_EMBEDDING_MODEL,
            },
        )

    def find_similar(
        self, summary: str, threshold: Optional[float] = None, n_results: int = 1
    ) -> Optional[Dict[str, Any]]:
        """
        Find semantically similar entries.

        Returns:
            Dict with {id, summary, distance, metadata} if found, else None
        """
        if not self.client or not summary:
            return None

        threshold = threshold or config.CHROMADB_SIMILARITY_THRESHOLD

        try:
            results = self.collection.query(query_texts=[summary], n_results=n_results)

            if not results["ids"] or not results["ids"][0]:
                return None

            # ChromaDB returns L2 distance (lower is more similar)
            # Convert to similarity score (higher is more similar)
            distance = results["distances"][0][0]

            # For L2 distance, typical range is 0-2 for normalized embeddings
            # Convert to similarity: 1 - (distance / 2)
            similarity = 1.0 - min(distance / 2.0, 1.0)

            if similarity >= threshold:
                match_id = results["ids"][0][0]
                match_meta = results["metadatas"][0][0] if results["metadatas"] else {}
                match_doc = results["documents"][0][0] if results["documents"] else ""

                logger.info(
                    f"[ChromaDB] SEMANTIC MATCH found: "
                    f"similarity={similarity:.3f} (threshold={threshold}) "
                    f"id={match_id[:8]}..."
                )

                return {
                    "id": match_id,
                    "summary": match_doc,
                    "similarity": similarity,
                    "distance": distance,
                    "metadata": match_meta,
                }

            return None

        except Exception as e:
            logger.error(f"[ChromaDB] Query error: {e}")
            return None

    def add_event(
        self, event_id: str, summary: str, metadata: Optional[Dict[str, Any]] = None
    ):
        """Add event to ChromaDB for future similarity checks"""
        if not self.client or not summary:
            return

        try:
            # Prepare metadata (ChromaDB doesn't support nested dicts or None values)
            safe_metadata = {}
            if metadata:
                for key, value in metadata.items():
                    if value is not None and not isinstance(value, (dict, list)):
                        safe_metadata[key] = str(value)

            # Add timestamp
            safe_metadata["indexed_at"] = datetime.utcnow().isoformat()

            self.collection.add(
                ids=[event_id], documents=[summary], metadatas=[safe_metadata]
            )

            logger.debug(f"[ChromaDB] Added event: {event_id[:8]}...")

        except Exception as e:
            logger.error(f"[ChromaDB] Add error: {e}")

    def get_stats(self) -> Dict[str, Any]:
        """Get collection statistics"""
        if not self.client:
            return {"status": "unavailable"}

        try:
            count = self.collection.count()
            return {
                "status": "active",
                "total_documents": count,
                "collection_name": config.CHROMADB_COLLECTION,
                "embedding_model": config.CHROMADB_EMBEDDING_MODEL,
                "similarity_threshold": config.CHROMADB_SIMILARITY_THRESHOLD,
            }
        except Exception as e:
            logger.error(f"[ChromaDB] Stats error: {e}")
            return {"status": "error", "error": str(e)}

    def clear_collection(self):
        """Clear all entries (use with caution!)"""
        if not self.client:
            return

        try:
            self.client.delete_collection(config.CHROMADB_COLLECTION)
            self._init_client()  # Recreate empty collection
            logger.warning("[ChromaDB] Collection cleared!")
        except Exception as e:
            logger.error(f"[ChromaDB] Clear error: {e}")