import os os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" import urllib.request import tempfile import re import requests import trafilatura from bs4 import BeautifulSoup import fitz from PIL import Image import io import gradio as gr import numpy as np from io import StringIO from sklearn.neighbors import NearestNeighbors from sentence_transformers import SentenceTransformer import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, NllbTokenizer, M2M100ForConditionalGeneration, ) # CONFIG EMBED_MODEL = "intfloat/e5-small-v2" LLM_MODEL = "Qwen/Qwen2.5-0.5B-Instruct" TRANS_MODEL_ID = "facebook/nllb-200-distilled-600M" WORD_CHUNK_SIZE = 150 TOP_K_CHUNKS = 5 # TEXT CLEANING def preprocess(text: str) -> str: """Simple whitespace normalization (keeps content).""" text = text.replace("\r", " ").replace("\n", " ") text = re.sub(r"\s+", " ", text) return text.strip() # (PyMuPDF + urlretrieve) def safe_download_pdf(url: str) -> str: """ Safely download a PDF from URL using urllib. Works well for research sites (ScienceDirect, Euclid, etc.). Returns path to a temporary PDF file. """ tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") tmp_path = tmp.name tmp.close() urllib.request.urlretrieve(url, tmp_path) return tmp_path def pdf_to_text(path): doc = fitz.open(path) pages = [] for page in doc: text = page.get_text("text") if len(text.strip()) < 50 or "<<" in text or "/Obj" in text: try: text = page.get_textpage().extractText() except: pass if len(text.strip()) < 50: blocks = page.get_text("blocks") text = " ".join(b[4] for b in blocks if isinstance(b[4], str)) pages.append(text) doc.close() return pages def text_to_chunks(texts, word_length=WORD_CHUNK_SIZE, start_page=1): """Convert a list of page texts into overlapping word chunks """ text_toks = [t.split(" ") for t in texts] chunks = [] for idx, words in enumerate(text_toks): for i in range(0, len(words), word_length): chunk_words = words[i : i + word_length] if ( (i + word_length) > len(words) and (len(chunk_words) < word_length) and (len(text_toks) != (idx + 1)) ): text_toks[idx + 1] = chunk_words + text_toks[idx + 1] continue chunk = " ".join(chunk_words).strip() chunk = f'[Page no. {idx + start_page}] "{chunk}"' chunks.append(chunk) return chunks # HTML ARTICLE INGEST (for non-PDF URLs) def extract_html_text(url: str) -> str: #Extract main text from an article URL (non-PDF)- Uses trafilatura first, then BeautifulSoup fallback headers = { "User-Agent": "Mozilla/5.0 (compatible; RAG-bot/1.0)", "Accept": "text/html,application/xhtml+xml", } try: resp = requests.get(url, headers=headers, timeout=20) html = resp.text except Exception as e: return f"Error loading HTML: {e}" # Try trafilatura extracted = trafilatura.extract(html) if extracted and len(extracted) > 200: return preprocess(extracted) # Fallback to raw text via BeautifulSoup soup = BeautifulSoup(html, "html.parser") for bad in soup(["script", "style", "noscript"]): bad.decompose() text = soup.get_text(" ", strip=True) return preprocess(text) # SEMANTIC SEARCH class SemanticSearch: def __init__(self, model_name=EMBED_MODEL): self.embedder = SentenceTransformer(model_name) self.fitted = False self.chunks = [] self.nn = None def fit(self, chunks, batch_size=512, n_neighbors=TOP_K_CHUNKS): self.chunks = chunks emb = self.embedder.encode( chunks, batch_size=batch_size, convert_to_numpy=True, show_progress_bar=False, ).astype("float32") n_neighbors = min(n_neighbors, len(emb)) self.nn = NearestNeighbors(n_neighbors=n_neighbors, metric="cosine") self.nn.fit(emb) self.fitted = True def search(self, query, k=TOP_K_CHUNKS): if not self.fitted: raise ValueError("Vector store not ready. Load a document first.") k = min(k, len(self.chunks)) q_emb = self.embedder.encode([query], convert_to_numpy=True).astype("float32") dist, idx = self.nn.kneighbors(q_emb, n_neighbors=k) idx = idx[0] dist = dist[0] results = [(self.chunks[i], float(dist[j])) for j, i in enumerate(idx)] return results vs = SemanticSearch() # LOAD QWEN LLM q_tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) q_model = AutoModelForCausalLM.from_pretrained(LLM_MODEL).to("cpu") q_model.eval() @torch.no_grad() def run_llm(system_prompt: str, user_prompt: str, max_new_tokens: int = 256) -> str: messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] text = q_tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, ) inputs = q_tokenizer(text, return_tensors="pt").to("cpu") outputs = q_model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, temperature=0.2, top_p=0.9, eos_token_id=q_tokenizer.eos_token_id, ) gen = outputs[0][inputs["input_ids"].shape[1] :] out = q_tokenizer.decode(gen, skip_special_tokens=True) return out.strip() # FOR TRANSLATION (NLLB-200) t_tokenizer = NllbTokenizer.from_pretrained(TRANS_MODEL_ID) t_model = M2M100ForConditionalGeneration.from_pretrained(TRANS_MODEL_ID).to("cpu") LANG_CODES = { "English": "eng_Latn", "Hindi": "hin_Deva", "Telugu": "tel_Telu", "Tamil": "tam_Taml", "Kannada": "kan_Knda", "Malayalam": "mal_Mlym", "Bengali": "ben_Beng", "Marathi": "mar_Deva", "Gujarati": "guj_Gujr", "Odia": "ory_Orya", "Punjabi": "pan_Guru", "Assamese": "asm_Beng", } def translate_answer(text: str, lang: str) -> str: if lang == "auto" or lang == "English": return text try: tgt = LANG_CODES[lang] inputs = t_tokenizer(text, return_tensors="pt").to("cpu") outputs = t_model.generate( **inputs, forced_bos_token_id=t_tokenizer.convert_tokens_to_ids(tgt), max_length=400, ) return t_tokenizer.batch_decode(outputs, skip_special_tokens=True)[0] except Exception as e: print("Translation error:", e) return text # RAG PROMPT + ANSWERING def build_prompt(question: str, retrieved): ctx = "\n\n".join([c for c, _ in retrieved]) prompt = f""" You are a precise and factual RAG assistant. Use ONLY the context below to answer the question. If the context does not contain the answer, say: "I don't know based on this document." When possible: - Answer in short, clear points. - Refer to evidence using the [Page no. X] tags in the text. CONTEXT: {ctx} QUESTION: {question} Answer in English: """.strip() return prompt def highlight_sources(retrieved): html = "