api-sesgos / src /index.py
RodrickMJ
Add application file
f1db1e3
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
from contextlib import asynccontextmanager
from src.engine.analysis_engine import AnalysisEngine
engine = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global engine
engine = AnalysisEngine()
yield
app = FastAPI(lifespan=lifespan, title="FactChecker Ultra-Ligero")
class Paragraph(BaseModel):
type: str
text: str
class AnalysisRequest(BaseModel):
title: str
paragraphs: List[Paragraph]
user_text: str
class RagItem(BaseModel):
summary: str
class RagAnalysisRequest(BaseModel):
rag_results: List[RagItem]
user_text: str
@app.get("/")
def home():
return {"status": "corriendo", "modelos_cargados": "una sola vez"}
@app.post("/analyze")
async def analyze(payload: AnalysisRequest):
result = engine.analyze(
user_text=payload.user_text,
document_paragraphs=[p.dict() for p in payload.paragraphs],
title=payload.title,
)
return result
@app.post("/analyze-rag")
async def analyze_rag(payload: RagAnalysisRequest):
# 1. Convertir summaries a párrafos estándar del analizador
rag_paragraphs = [
{
"type": "rag_summary",
"text": item.summary
}
for item in payload.rag_results
]
# 2. Ejecutar el MISMO engine
result = engine.analyze(
user_text=payload.user_text,
document_paragraphs=rag_paragraphs,
title="RAG_CONTEXT"
)
# 3. Marcar que viene de RAG (trazabilidad futura)
result["source"] = "rag"
return result