Spaces:
Sleeping
Sleeping
| # streamlit_app.py | |
| import os | |
| import glob | |
| import json | |
| import time | |
| import math | |
| import re | |
| from typing import List, Dict, Any, Tuple | |
| import numpy as np | |
| import streamlit as st | |
| import PyPDF2 | |
| from dotenv import load_dotenv | |
| from huggingface_hub import InferenceClient, login | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from heapq import nlargest | |
| # FAISS (optional) | |
| try: | |
| import faiss | |
| except Exception: | |
| faiss = None | |
| # ========================= | |
| # Page + env | |
| # ========================= | |
| st.set_page_config(page_title="π Handbook Assistant", page_icon="π", layout="wide") | |
| st.title("π USTP Student Handbook Assistant (2023 Edition)") | |
| st.caption("This assistant answers only from the handbook. Place 'USTP Student Handbook 2023 Edition.pdf' in the same folder.") | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| hf_client = None | |
| if HF_TOKEN: | |
| try: | |
| login(HF_TOKEN) | |
| except Exception: | |
| # login might be unnecessary depending on environment | |
| pass | |
| try: | |
| hf_client = InferenceClient(token=HF_TOKEN) | |
| except Exception as e: | |
| st.warning(f"Could not init InferenceClient: {e}") | |
| # ========================= | |
| # Sidebar configuration | |
| # ========================= | |
| with st.sidebar: | |
| st.header("βοΈ Settings") | |
| model_options = { | |
| "Qwen 2.5 14B Instruct (default)": "Qwen/Qwen2.5-14B-Instruct", | |
| "Mistral 7B Instruct": "mistralai/Mistral-7B-Instruct-v0.3", | |
| "Llama 3 8B Instruct": "meta-llama/Meta-Llama-3-8B-Instruct", | |
| "Falcon 7B Instruct": "tiiuae/falcon-7b-instruct", | |
| "Mixtral 8x7B Instruct": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| } | |
| model_label = st.selectbox("Choose model", list(model_options.keys()), index=0) | |
| DEFAULT_MODEL = model_options[model_label] | |
| st.markdown("---") | |
| similarity_threshold = st.slider("Similarity threshold", 0.30, 0.95, 0.62, 0.01) | |
| top_k = st.slider("Top K retrieved chunks", 1, 10, 4) | |
| chunk_size_chars = st.number_input("Chunk size (chars)", min_value=400, max_value=3000, value=1200, step=100) | |
| chunk_overlap = st.number_input("Chunk overlap (chars)", min_value=20, max_value=800, value=150, step=10) | |
| regenerate_index = st.button("π Rebuild handbook index (re-extract & re-embed)") | |
| # ========================= | |
| # Filenames for index/meta | |
| # ========================= | |
| HAND_INDEX_FN = "handbook_faiss.index" | |
| HAND_META_FN = "handbook_metadata.json" | |
| HAND_EMB_DIM_FN = "handbook_emb_dim.json" | |
| # ========================= | |
| # Utilities: find/load PDF | |
| # ========================= | |
| def find_handbook(preferred_name: str = "USTP Student Handbook 2023 Edition.pdf") -> List[str]: | |
| """Return list containing handbook path (preferred) or first pdf found.""" | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| preferred_path = os.path.join(current_dir, preferred_name) | |
| if os.path.exists(preferred_path): | |
| st.info(f"π Found handbook: {preferred_name}") | |
| return [preferred_path] | |
| # fallback: any pdf | |
| pdfs = glob.glob(os.path.join(current_dir, "*.pdf")) | |
| if pdfs: | |
| st.warning(f"β οΈ Preferred handbook not found. Using {os.path.basename(pdfs[0])}") | |
| return [pdfs[0]] | |
| st.error("β No PDF found in the app folder. Please add the handbook PDF.") | |
| return [] | |
| def load_pdf_texts_with_page_info(pdf_paths: List[str]) -> List[Dict[str, Any]]: | |
| """Extract text (per page) and return list of dicts with filename, page, text.""" | |
| pages = [] | |
| for p in pdf_paths: | |
| try: | |
| with open(p, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for i, page in enumerate(reader.pages): | |
| try: | |
| raw = page.extract_text() or "" | |
| except Exception: | |
| raw = "" | |
| pages.append({"filename": os.path.basename(p), "page": i + 1, "text": raw}) | |
| except Exception as e: | |
| st.warning(f"Failed to read {p}: {e}") | |
| return pages | |
| def chunk_pages_into_segments(pages: List[Dict[str, Any]], chunk_size: int, overlap: int) -> List[Dict[str, Any]]: | |
| """ | |
| Split pages into overlapping character chunks while preserving filename/page metadata. | |
| """ | |
| chunks = [] | |
| for pg in pages: | |
| text = (pg.get("text") or "").strip() | |
| if not text: | |
| continue | |
| filename = pg.get("filename", "handbook") | |
| page_no = pg.get("page", 0) | |
| start = 0 | |
| chunk_id = 0 | |
| L = len(text) | |
| while start < L: | |
| end = min(start + chunk_size, L) | |
| seg = text[start:end].strip() | |
| if len(seg) >= 30: | |
| chunks.append({ | |
| "filename": filename, | |
| "page": page_no, | |
| "chunk_id": f"{filename}_p{page_no}_c{chunk_id}", | |
| "content": seg | |
| }) | |
| chunk_id += 1 | |
| start = end - overlap | |
| if start < 0: | |
| start = 0 | |
| return chunks | |
| # ========================= | |
| # Embeddings: robust pipeline | |
| # ========================= | |
| TFIDF_MAX_FEATURES = 50000 | |
| def get_tfidf_vectorizer(): | |
| return TfidfVectorizer(stop_words="english", max_features=TFIDF_MAX_FEATURES) | |
| def load_local_embedder(): | |
| """ | |
| Try to load a SentenceTransformer model. Will raise if cannot load. | |
| """ | |
| # compact, fast model recommended | |
| MODEL_NAME = "all-MiniLM-L6-v2" | |
| return SentenceTransformer(MODEL_NAME) | |
| def hf_embeddings_call_if_possible(texts: List[str], model_name: str = "sentence-transformers/all-mpnet-base-v2") -> Tuple[bool, Any]: | |
| """ | |
| Try calling HF InferenceClient embeddings call in a few ways depending on client version. | |
| Returns (success_bool, embeddings_or_error) | |
| """ | |
| if not hf_client: | |
| return False, "No HF client" | |
| try: | |
| # Preferred modern method | |
| if hasattr(hf_client, "embeddings"): | |
| out = hf_client.embeddings(model=model_name, inputs=texts) | |
| # handle common shapes | |
| if isinstance(out, dict) and "embedding" in out: | |
| # single input case | |
| return True, np.array(out["embedding"], dtype=np.float32) | |
| # sometimes returns list of dicts | |
| if isinstance(out, list) and out and isinstance(out[0], dict) and "embedding" in out[0]: | |
| arr = [d["embedding"] for d in out] | |
| return True, np.array(arr, dtype=np.float32) | |
| # sometimes returns list-of-lists | |
| if isinstance(out, list) and len(out) and isinstance(out[0], (list, tuple)): | |
| return True, np.array(out, dtype=np.float32) | |
| return False, f"Unexpected hf_client.embeddings output shape: {type(out)}" | |
| # older client versions may have 'feature_extraction' | |
| if hasattr(hf_client, "feature_extraction"): | |
| out = hf_client.feature_extraction(texts, model=model_name) | |
| return True, np.array(out, dtype=np.float32) | |
| # As a last resort, try .post() to the inference endpoint (some versions) | |
| if hasattr(hf_client, "post"): | |
| url = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{model_name}" | |
| out = hf_client.post(url, json={"inputs": texts}) | |
| return True, np.array(out, dtype=np.float32) | |
| except Exception as e: | |
| return False, e | |
| return False, "No known embeddings method on hf_client" | |
| def fallback_vectorize(texts: List[str]) -> np.ndarray: | |
| """TF-IDF fallback embeddings (normalized).""" | |
| if not texts: | |
| return np.zeros((0, 0), dtype=np.float32) | |
| vect = get_tfidf_vectorizer() | |
| X = vect.fit_transform(texts) # sparse matrix | |
| arr = X.toarray().astype(np.float32) | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) | |
| norms[norms == 0] = 1.0 | |
| arr = arr / norms | |
| return arr | |
| def embed_texts(texts: List[str]) -> np.ndarray: | |
| """ | |
| Unified embedding function: | |
| 1) Try HF embedding call (if client present) | |
| 2) Try local SentenceTransformer embedder | |
| 3) Fallback to TF-IDF | |
| Returns normalized float32 numpy array. | |
| """ | |
| if not texts: | |
| return np.zeros((0, 0), dtype=np.float32) | |
| # 1) HF first (cheap if credits available) | |
| success, out = hf_embeddings_call_if_possible(texts) | |
| if success: | |
| try: | |
| arr = np.array(out, dtype=np.float32) | |
| # if single vector returned for single input, reshape | |
| if arr.ndim == 1: | |
| arr = arr.reshape(1, -1) | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) | |
| norms[norms == 0] = 1.0 | |
| return arr / norms | |
| except Exception: | |
| pass | |
| # 2) Local model | |
| try: | |
| model = load_local_embedder() | |
| arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False) | |
| arr = np.array(arr, dtype=np.float32) | |
| if arr.ndim == 1: | |
| arr = arr.reshape(1, -1) | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) | |
| norms[norms == 0] = 1.0 | |
| return arr / norms | |
| except Exception as e: | |
| st.warning(f"β οΈ Local SentenceTransformer failed or unavailable: {e}") | |
| # 3) TF-IDF fallback | |
| try: | |
| st.info("Using TF-IDF fallback embeddings (offline).") | |
| return fallback_vectorize(texts) | |
| except Exception as e: | |
| st.error(f"Embedding fallback failed completely: {e}") | |
| return np.zeros((len(texts), 128), dtype=np.float32) | |
| # ========================= | |
| # Build / load index | |
| # ========================= | |
| def build_faiss_index(chunks: List[Dict[str, Any]]) -> Tuple[Any, List[Dict[str, Any]]]: | |
| """ | |
| Build FAISS index (if faiss available) and return index + metadata (chunks) | |
| """ | |
| texts = [c["content"] for c in chunks] | |
| emb = embed_texts(texts) | |
| if emb.size == 0: | |
| raise RuntimeError("No embeddings produced.") | |
| if faiss is not None: | |
| d = emb.shape[1] | |
| # Use Inner Product on normalized vectors for cosine | |
| index = faiss.IndexFlatIP(d) | |
| # ensure normalized | |
| norms = np.linalg.norm(emb, axis=1, keepdims=True) | |
| norms[norms == 0] = 1.0 | |
| emb_norm = emb / norms | |
| index.add(emb_norm.astype("float32")) | |
| # Save index & metadata | |
| faiss.write_index(index, HAND_INDEX_FN) | |
| with open(HAND_META_FN, "w", encoding="utf-8") as f: | |
| json.dump(chunks, f, indent=2) | |
| with open(HAND_EMB_DIM_FN, "w", encoding="utf-8") as f: | |
| json.dump({"dim": d}, f) | |
| return index, chunks | |
| else: | |
| # No FAISS: we return embeddings baked into an in-memory structure (meta includes embeddings) | |
| for i, c in enumerate(chunks): | |
| c["_embedding"] = emb[i].tolist() | |
| with open(HAND_META_FN, "w", encoding="utf-8") as f: | |
| json.dump(chunks, f, indent=2) | |
| return None, chunks | |
| def load_index_and_metadata() -> Tuple[Any, List[Dict[str, Any]]]: | |
| if os.path.exists(HAND_META_FN) and os.path.exists(HAND_EMB_DIM_FN) and os.path.exists(HAND_INDEX_FN) and faiss is not None: | |
| try: | |
| index = faiss.read_index(HAND_INDEX_FN) | |
| with open(HAND_META_FN, "r", encoding="utf-8") as f: | |
| meta = json.load(f) | |
| return index, meta | |
| except Exception as e: | |
| st.warning(f"Failed to load saved FAISS index: {e}") | |
| return None, None | |
| # fallback to metadata only | |
| if os.path.exists(HAND_META_FN): | |
| with open(HAND_META_FN, "r", encoding="utf-8") as f: | |
| meta = json.load(f) | |
| return None, meta | |
| return None, None | |
| # ========================= | |
| # Retrieval | |
| # ========================= | |
| def retrieve_top_chunks(query: str, k: int = 4, metadata: List[Dict[str, Any]] = None, index = None) -> Tuple[List[Dict[str, Any]], List[float]]: | |
| """ | |
| Return top-k chunks and similarity scores (cosine-like). | |
| Works with FAISS if available, otherwise does brute-force using stored embeddings or TF-IDF. | |
| """ | |
| if not metadata: | |
| metadata = [] | |
| # If FAISS index available | |
| if index is not None: | |
| q_emb = embed_texts([query]) | |
| if q_emb.ndim == 1: | |
| q_emb = q_emb.reshape(1, -1) | |
| # normalize and search | |
| norms = np.linalg.norm(q_emb, axis=1, keepdims=True) | |
| norms[norms == 0] = 1.0 | |
| q_emb_norm = q_emb / norms | |
| D, I = index.search(q_emb_norm.astype("float32"), k) | |
| scores = D[0].tolist() | |
| idxs = I[0].tolist() | |
| results = [] | |
| for idx, score in zip(idxs, scores): | |
| if 0 <= idx < len(metadata): | |
| results.append(metadata[idx]) | |
| return results, scores | |
| # else brute-force: metadata may include stored embeddings or we compute embeddings of metadata texts | |
| # If metadata items have "_embedding", use them | |
| if metadata and "_embedding" in metadata[0]: | |
| emb_mat = np.array([np.array(m["_embedding"], dtype=np.float32) for m in metadata]) | |
| q_emb = embed_texts([query]).astype(np.float32) | |
| if q_emb.ndim == 1: | |
| q_emb = q_emb.reshape(1, -1) | |
| # cosine | |
| emb_norms = np.linalg.norm(emb_mat, axis=1, keepdims=True) | |
| emb_norms[emb_norms == 0] = 1.0 | |
| emb_mat_n = emb_mat / emb_norms | |
| qn = q_emb / np.linalg.norm(q_emb, axis=1, keepdims=True) | |
| sims = (emb_mat_n @ qn.T).squeeze() # cosine values | |
| idxs = np.argsort(-sims)[:k] | |
| results = [metadata[int(i)] for i in idxs] | |
| scores = [float(sims[int(i)]) for i in idxs] | |
| return results, scores | |
| # final fallback: TF-IDF direct scoring between query and chunk contents (cheap) | |
| texts = [m["content"] for m in metadata] | |
| vect = TfidfVectorizer(stop_words="english", max_features=TFIDF_MAX_FEATURES) | |
| if texts: | |
| X = vect.fit_transform(texts) | |
| qv = vect.transform([query]) | |
| sims = (X @ qv.T).toarray().squeeze() | |
| idxs = np.argsort(-sims)[:k] | |
| results = [metadata[int(i)] for i in idxs] | |
| scores = [float(sims[int(i)]) for i in idxs] | |
| return results, scores | |
| return [], [] | |
| # ========================= | |
| # Extractive answer fallback | |
| # ========================= | |
| def extractive_answer_from_chunks(retrieved_chunks: List[Dict[str, Any]], query: str) -> str: | |
| if not retrieved_chunks: | |
| return "The handbook does not specify that." | |
| q_tokens = set([t.lower() for t in re.findall(r"\w+", query) if len(t) > 2]) | |
| scored = [] | |
| for rc in retrieved_chunks: | |
| text = rc.get("content") or rc.get("text") or "" | |
| sents = re.split(r'(?<=[.!?])\s+', text) | |
| for s in sents: | |
| tokens = set([t.lower() for t in re.findall(r"\w+", s) if len(t) > 2]) | |
| if not tokens: | |
| continue | |
| overlap = len(q_tokens & tokens) / (1 + len(tokens)) | |
| scored.append((overlap, s.strip(), rc)) | |
| if not scored: | |
| return "The handbook does not specify that." | |
| topk = nlargest(2, scored, key=lambda x: x[0]) | |
| parts = [] | |
| for score, sent, rc in topk: | |
| cite = f"(Source: {rc.get('filename','handbook')}, page {rc.get('page',0)})" | |
| short_sent = sent if len(sent) <= 400 else sent[:397] + "..." | |
| parts.append(f"\"{short_sent}\" {cite}") | |
| final = "\n\n".join(parts) | |
| final += "\n\nTakeaway: Refer to the cited section(s) above for the official handbook wording." | |
| return final | |
| # ========================= | |
| # Generation with HF fallback | |
| # ========================= | |
| def try_hf_generate(prompt: str) -> Tuple[bool, str]: | |
| """ | |
| Try various HF generation endpoints. Returns (success, text_or_error). | |
| Handles different InferenceClient versions gracefully. | |
| """ | |
| if not hf_client: | |
| return False, "No HF client" | |
| # 1) text_generation method | |
| try: | |
| if hasattr(hf_client, "text_generation"): | |
| out = hf_client.text_generation(model=DEFAULT_MODEL, inputs=prompt, max_new_tokens=400, temperature=0.25) | |
| # out may be dict or list depending on client | |
| if isinstance(out, dict) and "generated_text" in out: | |
| return True, out["generated_text"] | |
| if isinstance(out, list) and out and "generated_text" in out[0]: | |
| return True, out[0]["generated_text"] | |
| return True, str(out) | |
| except Exception as e: | |
| # ignore and fallback | |
| pass | |
| # 2) chat style: try common patterns | |
| try: | |
| # Some clients expose hf_client.chat() | |
| if hasattr(hf_client, "chat"): | |
| resp = hf_client.chat(model=DEFAULT_MODEL, messages=[{"role":"user","content":prompt}], max_tokens=400, temperature=0.25) | |
| # try to extract common shapes | |
| if isinstance(resp, dict) and "choices" in resp: | |
| try: | |
| return True, resp["choices"][0]["message"]["content"] | |
| except Exception: | |
| return True, str(resp) | |
| if isinstance(resp, list) and resp and isinstance(resp[0], dict) and "generated_text" in resp[0]: | |
| return True, resp[0]["generated_text"] | |
| return True, str(resp) | |
| # Some clients have chat.completions.create() | |
| if hasattr(hf_client, "chat") and hasattr(hf_client.chat, "completions") and hasattr(hf_client.chat.completions, "create"): | |
| resp = hf_client.chat.completions.create(model=DEFAULT_MODEL, messages=[{"role":"user","content":prompt}], max_tokens=400, temperature=0.25) | |
| try: | |
| return True, resp.choices[0].message["content"] | |
| except Exception: | |
| return True, str(resp) | |
| # Last resort: some clients have 'create' on top-level | |
| if hasattr(hf_client, "create"): | |
| resp = hf_client.create(model=DEFAULT_MODEL, inputs=prompt, max_new_tokens=400, temperature=0.25) | |
| if isinstance(resp, dict) and "generated_text" in resp: | |
| return True, resp["generated_text"] | |
| return True, str(resp) | |
| except Exception as e: | |
| return False, e | |
| return False, "No known generation method" | |
| def generate_answer(context: str, query: str, retrieved_chunks: List[Dict[str, Any]] = None) -> str: | |
| """ | |
| Attempt to call HF generation; if that fails, fallback to extractive, citation-backed answer. | |
| Pass retrieved_chunks (list) so extractive fallback can cite pages. | |
| """ | |
| prompt = f""" | |
| You are a precise academic assistant specialized in university policies. | |
| Use only the provided USTP Student Handbook content below. If the answer is not in the provided text, respond exactly: | |
| "The handbook does not specify that." | |
| Context: | |
| {context} | |
| Question: {query} | |
| Provide a concise answer including source citations (filename + page). | |
| """ | |
| success, out = try_hf_generate(prompt) | |
| if success: | |
| # if out is not str, ensure str | |
| return out if isinstance(out, str) else str(out) | |
| # HF failed (e.g., 402 or no credits) -> extractive fallback | |
| st.warning("HF generation unavailable β using extractive handbook-backed answer (no hallucination).") | |
| return extractive_answer_from_chunks(retrieved_chunks or [], query) | |
| # ========================= | |
| # Index management (persist/load) | |
| # ========================= | |
| def ensure_handbook_index(rebuild: bool = False): | |
| """ | |
| Create or load index and metadata. | |
| Stores results in st.session_state as well for quick reuse. | |
| """ | |
| # If already built and not rebuilding, return | |
| if st.session_state.get("handbook_ready") and not rebuild: | |
| return | |
| pdfs = find_handbook() | |
| if not pdfs: | |
| st.session_state.handbook_ready = False | |
| st.session_state.handbook_chunks = [] | |
| return | |
| # if saved index exists & not rebuilding | |
| if not rebuild and os.path.exists(HAND_META_FN) and (faiss is not None and os.path.exists(HAND_INDEX_FN) and os.path.exists(HAND_EMB_DIM_FN)): | |
| try: | |
| idx, meta = load_index_and_metadata() | |
| if meta: | |
| st.session_state.faiss_index = idx | |
| st.session_state.metadata = meta | |
| st.session_state.handbook_ready = True | |
| st.success(f"Loaded saved index ({len(meta)} chunks).") | |
| return | |
| except Exception: | |
| pass | |
| # extract pages -> chunks | |
| pages = load_pdf_texts_with_page_info(pdfs) | |
| chunks = chunk_pages_into_segments(pages, chunk_size=int(chunk_size_chars), overlap=int(chunk_overlap)) | |
| if not chunks: | |
| st.error("No text found in PDFs.") | |
| st.session_state.handbook_ready = False | |
| return | |
| # build index (this will attempt HF embeddings -> local -> TFIDF) | |
| try: | |
| idx, meta = build_faiss_index(chunks) | |
| st.session_state.faiss_index = idx | |
| st.session_state.metadata = meta | |
| st.session_state.handbook_ready = True | |
| st.success(f"Indexed {len(meta)} chunks.") | |
| except Exception as e: | |
| st.error(f"Failed to build index: {e}") | |
| # as fallback, store chunks in session | |
| st.session_state.metadata = chunks | |
| st.session_state.faiss_index = None | |
| st.session_state.handbook_ready = True | |
| # build / load index | |
| ensure_handbook_index(rebuild=regenerate_index) | |
| # ========================= | |
| # Chat UI | |
| # ========================= | |
| st.divider() | |
| st.subheader("π¬ Ask the handbook (only handbook-based answers)") | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| # Input and handling | |
| user_query = st.chat_input("Ask a question about the handbook...") | |
| if user_query: | |
| ts = int(time.time() * 1000) | |
| st.session_state.chat_history.append({"role": "user", "content": user_query, "ts": ts}) | |
| # Retrieve top chunks | |
| index = st.session_state.get("faiss_index") | |
| metadata = st.session_state.get("metadata", []) | |
| with st.spinner("π Retrieving relevant handbook excerpts..."): | |
| retrieved, scores = retrieve_top_chunks(user_query, k=int(top_k), metadata=metadata, index=index) | |
| # Reject if no good match | |
| if not retrieved or (scores and max(scores) < float(similarity_threshold)): | |
| reply = "Sorry, I can only answer questions based on the school's handbook. I couldn't find relevant information in the handbook for your question." | |
| st.session_state.chat_history.append({"role": "assistant", "content": reply, "ts": int(time.time() * 1000)}) | |
| else: | |
| # Build context snippet for model (concise) | |
| context_text = "\n\n".join([f"--- {r['chunk_id']} | {r['filename']} | page {r['page']} ---\n{r['content']}" if 'chunk_id' in r else f"(Page {r.get('page')})\n{r.get('content')}" for r in retrieved]) | |
| # Query model or fallback extractive | |
| with st.spinner("π€ Generating answer..."): | |
| ans = generate_answer(context_text, user_query, retrieved_chunks=retrieved) | |
| # Append citation block | |
| citations = "\n".join([f"{r.get('chunk_id', 'n/a')} β {r.get('filename')} p{r.get('page')} (score {float(s):.3f})" for r, s in zip(retrieved, scores or [])]) | |
| final = f"{ans}\n\n**Retrieved sources (top results):**\n{citations}" | |
| st.session_state.chat_history.append({"role": "assistant", "content": final, "ts": int(time.time() * 1000)}) | |
| # Display chat history with unique keys | |
| st.divider() | |
| st.subheader("Conversation") | |
| for i, entry in enumerate(st.session_state.chat_history): | |
| is_user = entry.get("role") == "user" | |
| # use ts and i to ensure uniqueness across identical messages | |
| key = f"msg_{i}_{entry.get('ts',0)}" | |
| st_message(entry["content"], is_user=is_user, key=key) | |
| # Toolbar | |
| st.divider() | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| if st.button("π Reset chat"): | |
| st.session_state.chat_history = [] | |
| st.success("Chat reset.") | |
| with col2: | |
| transcript = "\n\n".join([f"{m['role'].upper()}: {m['content']}" for m in st.session_state.chat_history]) | |
| st.download_button("π₯ Download transcript", data=transcript, file_name="handbook_transcript.txt") | |
| st.caption("β‘ FAISS + Local embeddings + Hugging Face (when available). Default model: Qwen 2.5 14B") | |