from fastapi import FastAPI, UploadFile, File, Form from fastapi.responses import HTMLResponse from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pymongo import MongoClient from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import MongoDBAtlasVectorSearch from pypdf import PdfReader from collections import defaultdict import google.generativeai as genai import io, traceback, numpy as np import os from fastapi import Request GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") MONGO_URI = "mongodb+srv://trang_topcv:ai.rxhfZ7h4z.TK@trangtopcv.74csrl4.mongodb.net/?retryWrites=true&w=majority" DB_NAME = "topcv_jobs" COLLECTION_NAME = "job" INDEX_NAME = "job_index" genai.configure(api_key=GEMINI_API_KEY) # INIT client = MongoClient(MONGO_URI) collection = client[DB_NAME][COLLECTION_NAME] embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" ) vectorstore = MongoDBAtlasVectorSearch( collection=collection, embedding=embeddings, index_name=INDEX_NAME, text_key="chunk", embedding_key="embedding" ) # UTILS def extract_text_from_pdf(file_bytes: bytes) -> str: """Đọc nội dung PDF""" reader = PdfReader(io.BytesIO(file_bytes)) text = "" for page in reader.pages: page_text = page.extract_text() or "" text += page_text + "\n" if not text.strip(): raise ValueError("Không trích xuất được nội dung từ PDF.") return text.strip() def chunk_text(text, chunk_size=500, overlap_size=50): """Chia CV thành các đoạn nhỏ để embedding""" chunks, start = [], 0 while start < len(text): end = min(start + chunk_size, len(text)) chunks.append(text[start:end]) start += chunk_size - overlap_size return chunks def aggregate_chunks(docs): """Gom các đoạn chunk theo Job_ids""" job_scores = defaultdict(list) job_metadata = dict() for doc in docs: meta = doc.metadata.get("metadata", doc.metadata) job_id = meta.get("Job_ids") if not job_id: continue score = getattr(doc, "score", 1) job_scores[job_id].append(score) if job_id not in job_metadata: job_metadata[job_id] = meta job_final = [] for job_id, scores in job_scores.items(): avg_score = sum(scores) / len(scores) job_final.append({ "job_id": job_id, "score": avg_score, "metadata": job_metadata[job_id] }) job_final_sorted = sorted(job_final, key=lambda x: x["score"], reverse=True)[:5] return job_final_sorted def format_job_markdown(md: dict) -> str: """Hiển thị job dạng Markdown có link click được""" link = md.get("Link công việc", "") link_md = f"[🔗 Xem chi tiết tại đây]({link})" if link else "Không có link" return ( f"**💼 Vị trí:** {md.get('Vị trí','')}\n" f"**🏢 Công ty:** {md.get('Công ty','')}\n" f"**💰 Mức lương:** {md.get('Mức lương','')}\n" f"**📍 Địa điểm:** {md.get('Địa điểm','')}\n" f"**📝 Mô tả:** {md.get('Mô tả công việc','')[:400]}...\n" f"**🎯 Yêu cầu:** {md.get('Yêu cầu ứng viên','')}\n" f"{link_md}\n\n" ) # FASTAPI APP app = FastAPI(title="Chatbot Tư vấn Việc làm IT - TopCV") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) user_context = {} # Upload CV + gợi ý job @app.post("/upload_cv") async def upload_cv(request: Request, file: UploadFile = File(...)): diagnostics = {} try: # 0. đọc file file_bytes = await file.read() cv_text = extract_text_from_pdf(file_bytes) diagnostics["cv_length"] = len(cv_text) diagnostics["cv_sample"] = cv_text[:300].replace("\n", " ") if not cv_text.strip(): return JSONResponse({"status": "error", "message": "Không trích xuất được nội dung từ CV.", "diagnostics": diagnostics}, status_code=400) # 1. chunk CV chunks = chunk_text(cv_text) diagnostics["num_chunks"] = len(chunks) if len(chunks) == 0: return JSONResponse({"status": "error", "message": "CV quá ngắn sau khi chunk.", "diagnostics": diagnostics}, status_code=400) # 2. kiểm tra DB có embedding sample không sample_doc = collection.find_one({"embedding": {"$exists": True}}, {"embedding": 1}) if not sample_doc or "embedding" not in sample_doc: return JSONResponse({ "status": "error", "message": "Không tìm thấy trường 'embedding' trong MongoDB. Hãy đảm bảo bạn đã lưu embedding cho job.", "diagnostics": diagnostics }, status_code=400) db_emb = sample_doc["embedding"] diagnostics["db_embedding_dim"] = len(db_emb) all_docs = [] # 3. xử lý từng chunk: build embedding + tìm kiếm for idx, ch in enumerate(chunks): try: emb = embeddings.embed_query(ch) # convert nếu trả về numpy array if hasattr(emb, "tolist"): emb_list = emb.tolist() else: # đảm bảo là list of float emb_list = list(map(float, emb)) # lưu diagnostics cho chunk đầu if idx == 0: diagnostics["first_chunk_sample"] = ch[:200].replace("\n", " ") diagnostics["first_chunk_emb_dim"] = len(emb_list) diagnostics["first_chunk_emb_head"] = emb_list[:5] # nếu dimension khác, cố fallback sang text-search docs = [] if len(emb_list) == diagnostics["db_embedding_dim"]: try: docs = vectorstore.similarity_search_by_vector(emb_list, k=8) diagnostics.setdefault("vector_search_counts", []).append(len(docs)) except Exception as e: diagnostics.setdefault("vector_search_errors", []).append(str(e)) docs = [] else: diagnostics.setdefault("dim_mismatch_chunks", []).append({ "chunk_idx": idx, "chunk_dim": len(emb_list), "db_dim": diagnostics["db_embedding_dim"] }) # fallback: nếu vector search rỗng, thử text search (vectorstore tự embed bên trong) if not docs: try: docs_text = vectorstore.similarity_search(ch, k=8) diagnostics.setdefault("text_search_counts", []).append(len(docs_text)) docs = docs_text except Exception as e: diagnostics.setdefault("text_search_errors", []).append(str(e)) docs = [] all_docs.extend(docs) except Exception as e: diagnostics.setdefault("chunk_embedding_errors", []).append({"idx": idx, "error": str(e)}) continue # 4. Nếu không tìm thấy bất kỳ doc nào diagnostics["total_docs_found"] = len(all_docs) if not all_docs: return JSONResponse({ "status": "error", "message": "Không tìm được công việc tương tự trong cơ sở dữ liệu. Kiểm tra embedding hoặc MongoDB index.", "diagnostics": diagnostics }, status_code=200) # 5. Gom theo job_id và chọn top jobs top_jobs = aggregate_chunks(all_docs) diagnostics["top_jobs_count"] = len(top_jobs) if not top_jobs: return JSONResponse({ "status": "error", "message": "Không tìm thấy job phù hợp sau khi gom chunk.", "diagnostics": diagnostics }, status_code=200) # 6. format job snippets (markdown clickable links) job_snippets = "\n".join([format_job_markdown(j["metadata"]) for j in top_jobs]) # 7. gọi Gemini LLM # 7. Gọi Gemini LLM để sinh gợi ý công việc diagnostics = {} try: model = genai.GenerativeModel("gemini-2.0-flash-001") short_cv = cv_text[:1500] # tránh lỗi do CV quá dài top_jobs_text = "\n".join([format_job_markdown(j["metadata"]) for j in top_jobs[:5]]) prompt = f""" Dưới đây là nội dung CV và danh sách các công việc có thể phù hợp. ### 🧾 CV Ứng viên: {short_cv} ### 💼 Danh sách công việc tương tự: {top_jobs_text} Hãy gợi ý 3-5 công việc phù hợp nhất (chọn trong danh sách trên) và nêu ngắn gọn lý do phù hợp cho từng công việc. """ response = model.generate_content(prompt) if not hasattr(response, "text") or not response.text: summary = "Không có phản hồi từ Gemini." else: summary = response.text.strip() except Exception as e: diagnostics["gemini_error"] = str(e) print("⚠️ Lỗi khi gọi Gemini:", traceback.format_exc()) summary = f"Lỗi khi gọi Gemini: {str(e)}" # 8. Lưu context vào request.state (không dùng global nữa) request.state.last_cv_jobs = top_jobs_text request.state.last_cv_summary = summary return JSONResponse({ "status": "success", "suggestions": summary, "jobs": top_jobs_text, "diagnostics": diagnostics }) except Exception as e: diagnostics["exception"] = str(e) print("❌ Lỗi upload_cv:", traceback.format_exc()) return JSONResponse({ "status": "error", "message": str(e), "diagnostics": diagnostics }, status_code=500) # Chat query (dùng hoặc không dùng CV) @app.post("/chat") async def chat(request: Request, query: str = Form(...)): try: # Lấy context từ request.state cv_summary = getattr(request.state, "last_cv_summary", "") job_snippets = getattr(request.state, "last_cv_jobs", "") # Nếu chưa có CV, tìm trực tiếp theo query if not cv_summary: docs = vectorstore.similarity_search(query, k=10) top_jobs = aggregate_chunks(docs) job_snippets = "\n".join([format_job_markdown(j["metadata"]) for j in top_jobs]) cv_summary = "Người dùng chưa upload CV, dùng kết quả tìm kiếm trực tiếp." prompt = f""" Dữ liệu tham khảo: {cv_summary} Các công việc liên quan: {job_snippets} Người dùng hỏi: {query} Hãy trả lời dựa trên dữ liệu ở trên. Nếu không đủ thông tin, trả lời: "Tôi không có thông tin chi tiết về công việc này." """ model = genai.GenerativeModel("gemini-2.0-flash-001") response = model.generate_content(prompt) return {"query": query, "response": response.text.strip()} except Exception as e: print("❌ Lỗi chat:", traceback.format_exc()) return JSONResponse({"status": "error", "message": str(e)}, status_code=400) @app.get("/", response_class=HTMLResponse) def home(): return HTMLResponse(content="""