mathpulse-api / analytics.py
github-actions[bot]
๐Ÿš€ Auto-deploy from GitHub (b9366c8)
341ff5d
"""
MathPulse AI - ML-Powered Student Analytics & Adaptive Learning Module
Provides:
- Student competency assessment via IRT (Item Response Theory)
- Enhanced risk prediction with trained ML models (Random Forest / XGBoost)
- Quiz difficulty calibration engine
- Topic recommendation engine
- Learning analytics aggregation
- Mock data generation for development/testing
"""
import os
import math
import json
import time
import random
import logging
import hashlib
import traceback
from typing import List, Optional, Dict, Any, Tuple
from datetime import datetime, timedelta
from functools import lru_cache
from collections import defaultdict
import numpy as np # type: ignore[import-not-found]
from scipy import stats as scipy_stats # type: ignore[import-not-found]
from scipy.optimize import minimize_scalar # type: ignore[import-not-found]
from sklearn.linear_model import LinearRegression # type: ignore[import-not-found]
from sklearn.ensemble import RandomForestClassifier # type: ignore[import-not-found]
from sklearn.model_selection import train_test_split # type: ignore[import-not-found]
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report # type: ignore[import-not-found]
from pydantic import BaseModel, Field
# Optional heavy dependencies โ€” guarded imports
xgb: Any = None
shap: Any = None
joblib: Any = None
firebase_admin: Any = None
credentials: Any = None
firestore: Any = None
try:
import xgboost as xgb # type: ignore[import-not-found,no-redef]
HAS_XGBOOST = True
except ImportError:
HAS_XGBOOST = False
try:
import shap # type: ignore[import-not-found,no-redef]
HAS_SHAP = True
except ImportError:
HAS_SHAP = False
try:
import joblib # type: ignore[import-not-found,no-redef]
HAS_JOBLIB = True
except ImportError:
HAS_JOBLIB = False
try:
import firebase_admin # type: ignore[import-not-found,no-redef]
from firebase_admin import credentials, firestore # type: ignore[import-not-found,no-redef,assignment]
HAS_FIREBASE = True
except ImportError:
HAS_FIREBASE = False
logger = logging.getLogger("mathpulse.analytics")
# โ”€โ”€โ”€ Configuration โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
RISK_MODEL_PATH = "models/risk_classifier.joblib"
IRT_DIFFICULTY_CACHE_TTL = 3600 # 1 hour
MIN_QUIZ_ATTEMPTS_FOR_COMPETENCY = 3
LEARNING_VELOCITY_WINDOW_DAYS = 30
COMPETENCY_THRESHOLDS = {
"beginner": (0, 40),
"developing": (40, 65),
"proficient": (65, 85),
"advanced": (85, 100),
}
# Topic dependency / prerequisite graph
TOPIC_PREREQUISITES: Dict[str, List[str]] = {
"Quadratic Equations": ["Linear Equations", "Variables & Expressions"],
"Systems of Equations": ["Linear Equations", "Slope & Rate of Change"],
"Polynomials": ["Variables & Expressions", "Exponents & Powers"],
"Factoring": ["Polynomials", "Variables & Expressions"],
"Quadratic Functions": ["Quadratic Equations", "Functions"],
"Exponential Functions": ["Exponents & Powers", "Functions"],
"Trigonometric Ratios": ["Pythagorean Theorem", "Angles", "Triangles"],
"Trigonometric Functions": ["Trigonometric Ratios", "Functions"],
"Derivatives": ["Limits", "Functions"],
"Integration": ["Derivatives", "Area Under a Curve"],
"Limits": ["Functions", "Rational Expressions"],
"Coordinate Geometry": ["Linear Equations", "Slope & Rate of Change"],
"Circle Theorems": ["Circles", "Angles"],
"Logarithmic Functions": ["Exponential Functions"],
"Rational Functions": ["Polynomials", "Factoring"],
"Complex Numbers": ["Quadratic Equations", "Radicals & Exponents"],
"Matrices (Introduction)": ["Systems of Equations"],
"Conic Sections": ["Coordinate Geometry", "Quadratic Functions"],
"Probability of Compound Events": ["Probability Basics"],
"Permutations & Combinations": ["Probability Basics", "Factorial"],
"Hypothesis Testing Basics": ["Normal Distribution Basics", "Sampling Methods"],
"Confidence Intervals": ["Normal Distribution Basics", "Sampling Methods"],
"Regression Analysis": ["Scatter Plots", "Linear Functions"],
"Statistical Inference": ["Hypothesis Testing Basics", "Confidence Intervals"],
"Multivariable Calculus": ["Derivatives", "Integration"],
"Differential Equations": ["Derivatives", "Integration"],
"Vector Calculus": ["Multivariable Calculus", "Vectors"],
"Linear Transformations": ["Matrices & Determinants", "Vector Spaces"],
"Eigenvalues & Eigenvectors": ["Matrices & Determinants"],
}
# โ”€โ”€โ”€ Pydantic Models โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class CompetencyAnalysisRequest(BaseModel):
studentId: str
topicId: Optional[str] = None
class CompetencyAnalysis(BaseModel):
topicId: str
topicName: str
efficiencyScore: float = Field(..., ge=0, le=100)
competencyLevel: str
masteryPercentage: float
learningVelocity: float
totalAttempts: int
averageAccuracy: float
lastAttemptDate: Optional[str] = None
class CompetencyAnalysisResponse(BaseModel):
studentId: str
status: str # "success" | "insufficient_data"
analyses: List[CompetencyAnalysis]
overallCompetency: Optional[str] = None
thetaEstimate: Optional[float] = None
class TopicRecommendation(BaseModel):
topicId: str
topicName: str
recommendationScore: float
reasoning: str
estimatedTimeToMastery: int # hours
prerequisitesMet: bool
currentCompetency: str
class TopicRecommendationRequest(BaseModel):
studentId: str
numRecommendations: int = Field(default=5, ge=1, le=20)
class TopicRecommendationResponse(BaseModel):
studentId: str
recommendations: List[TopicRecommendation]
status: str
class EnhancedRiskPrediction(BaseModel):
riskLevel: str
confidence: float
probabilities: Dict[str, float]
contributingFactors: List[Dict[str, Any]]
recommendations: List[str]
modelUsed: str # "ml_model" | "rule_based" | "zero_shot"
class EnhancedRiskRequest(BaseModel):
studentId: str
engagementScore: float = Field(..., ge=0, le=100)
avgQuizScore: float = Field(..., ge=0, le=100)
attendance: float = Field(..., ge=0, le=100)
assignmentCompletion: float = Field(..., ge=0, le=100)
streak: Optional[int] = 0
xpGrowthRate: Optional[float] = 0.0
timeOnPlatform: Optional[float] = 0.0 # hours
# Optional trend data
engagementTrend7d: Optional[float] = None
quizScoreVariance: Optional[float] = None
consecutiveAbsences: Optional[int] = 0
daysSinceLastActivity: Optional[int] = 0
class RiskTrainRequest(BaseModel):
forceRetrain: bool = False
class RiskTrainResponse(BaseModel):
status: str
accuracy: float
precision: float
recall: float
f1Score: float
samplesUsed: int
modelPath: str
class CalibrateDifficultyRequest(BaseModel):
questionId: str
studentResponses: List[Dict[str, Any]] # [{studentId, correct, timeSpent, attempts}]
class CalibrateDifficultyResponse(BaseModel):
questionId: str
difficultyParameter: float # b parameter
discriminationParameter: float # a parameter
guessingParameter: float # c parameter
difficultyLabel: str # "easy" | "medium" | "hard"
totalResponses: int
successRate: float
class AdaptiveQuizRequest(BaseModel):
studentId: str
topicId: str
numQuestions: int = Field(default=10, ge=1, le=50)
targetSuccessRate: float = Field(default=0.70, ge=0.3, le=0.95)
class AdaptiveQuizSelection(BaseModel):
questionId: str
estimatedDifficulty: float
predictedSuccessProbability: float
difficultyLabel: str
class AdaptiveQuizResponse(BaseModel):
studentId: str
topicId: str
selectedQuestions: List[AdaptiveQuizSelection]
studentAbilityEstimate: float
expectedSuccessRate: float
difficultyDistribution: Dict[str, int]
class StudentSummaryResponse(BaseModel):
studentId: str
competencyDistribution: Dict[str, int]
riskAssessment: Optional[Dict[str, Any]] = None
recommendedTopics: List[Dict[str, Any]]
learningVelocityTrend: List[Dict[str, Any]]
efficiencyScores: Dict[str, float]
predictedNextQuizScore: Optional[float] = None
engagementPatterns: Dict[str, Any]
status: str
class ClassInsightsRequest(BaseModel):
teacherId: str
classId: Optional[str] = None
class ClassInsightsResponse(BaseModel):
teacherId: str
riskDistribution: Dict[str, int]
riskTrend: List[Dict[str, Any]]
commonWeakTopics: List[Dict[str, Any]]
learningVelocityDistribution: Dict[str, float]
engagementPatterns: Dict[str, Any]
interventionRecommendations: List[Dict[str, Any]]
successPredictions: Dict[str, Any]
totalStudents: int
status: str
class MockDataRequest(BaseModel):
numStudents: int = Field(default=30, ge=1, le=200)
numQuizzes: int = Field(default=20, ge=1, le=100)
seed: Optional[int] = None
class RefreshCacheResponse(BaseModel):
status: str
cachedItems: int
timestamp: str
# โ”€โ”€โ”€ In-Memory Caches โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_competency_cache: Dict[str, Tuple[float, Any]] = {}
_class_stats_cache: Dict[str, Tuple[float, Any]] = {}
_difficulty_cache: Dict[str, Tuple[float, Any]] = {}
_risk_model_cache: Dict[str, Any] = {}
def _cache_get(cache: Dict[str, Tuple[float, Any]], key: str, ttl: int) -> Optional[Any]:
"""Get from cache if not expired."""
if key in cache:
ts, val = cache[key]
if time.time() - ts < ttl:
return val
del cache[key]
return None
def _cache_set(cache: Dict[str, Tuple[float, Any]], key: str, value: Any):
"""Set a cache entry with current timestamp."""
cache[key] = (time.time(), value)
# โ”€โ”€โ”€ Firebase Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_firestore_db = None
def _get_firestore_db():
"""Get or initialise Firestore client."""
global _firestore_db
if _firestore_db is not None:
return _firestore_db
if not HAS_FIREBASE:
logger.warning("firebase-admin not installed; Firestore operations will use mock data")
return None
try:
# Check if already initialised
firebase_admin.get_app()
except ValueError:
# Initialise with default credentials or service account
cred_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
if cred_path and os.path.exists(cred_path):
cred = credentials.Certificate(cred_path)
firebase_admin.initialize_app(cred)
else:
# Try default credentials (e.g., GCP environment)
try:
firebase_admin.initialize_app()
except Exception as e:
logger.warning(f"Could not initialise Firebase: {e}")
return None
_firestore_db = firestore.client()
return _firestore_db
async def fetch_student_quiz_history(student_id: str) -> List[Dict[str, Any]]:
"""Fetch quiz attempt history for a student from Firestore."""
db = _get_firestore_db()
if db is None:
logger.info(f"No Firestore connection; returning empty quiz history for {student_id}")
return []
try:
# Query progress collection for the student
progress_ref = db.collection("progress").where("userId", "==", student_id)
docs = progress_ref.stream()
history = []
for doc in docs:
data = doc.to_dict()
if data:
data["id"] = doc.id
history.append(data)
# Also check quizAttempts subcollection if it exists
quiz_ref = db.collection("quizAttempts").where("studentId", "==", student_id).order_by(
"completedAt", direction=firestore.Query.DESCENDING
)
quiz_docs = quiz_ref.stream()
for doc in quiz_docs:
data = doc.to_dict()
if data:
data["id"] = doc.id
data["source"] = "quizAttempts"
history.append(data)
logger.info(f"Fetched {len(history)} quiz history records for student {student_id}")
return history
except Exception as e:
logger.error(f"Error fetching quiz history for {student_id}: {e}")
return []
async def fetch_student_engagement_metrics(student_id: str, days: int = 30) -> Dict[str, Any]:
"""Fetch engagement metrics for a student over the past N days."""
db = _get_firestore_db()
if db is None:
return {
"totalTimeOnPlatform": 0,
"sessionsCount": 0,
"avgSessionDuration": 0,
"dailyActivity": {},
"hourlyActivity": {},
}
try:
cutoff = datetime.utcnow() - timedelta(days=days)
# Fetch XP activities as engagement proxy
xp_ref = db.collection("xpActivities").where(
"userId", "==", student_id
).where("timestamp", ">=", cutoff)
xp_docs = xp_ref.stream()
daily_activity: Dict[str, int] = {}
hourly_activity: Dict[int, int] = defaultdict(int)
total_xp = 0
activity_count = 0
for doc in xp_docs:
data = doc.to_dict()
if data:
activity_count += 1
total_xp += data.get("xpAmount", 0)
ts = data.get("timestamp")
if ts:
if hasattr(ts, "seconds"):
dt = datetime.utcfromtimestamp(ts.seconds)
elif isinstance(ts, datetime):
dt = ts
else:
continue
day_key = dt.strftime("%Y-%m-%d")
daily_activity[day_key] = daily_activity.get(day_key, 0) + 1
hourly_activity[dt.hour] += 1
return {
"totalXP": total_xp,
"activityCount": activity_count,
"dailyActivity": daily_activity,
"hourlyActivity": dict(hourly_activity),
"activeDays": len(daily_activity),
"avgActivitiesPerDay": round(activity_count / max(len(daily_activity), 1), 2),
}
except Exception as e:
logger.error(f"Error fetching engagement metrics for {student_id}: {e}")
return {"totalXP": 0, "activityCount": 0, "dailyActivity": {}, "hourlyActivity": {}}
def fetch_topic_dependencies() -> Dict[str, List[str]]:
"""Return the topic prerequisite graph."""
return TOPIC_PREREQUISITES.copy()
async def store_competency_analysis(student_id: str, analysis: Dict[str, Any]):
"""Store competency analysis results in Firestore."""
db = _get_firestore_db()
if db is None:
logger.info(f"No Firestore; skipping competency storage for {student_id}")
return
try:
doc_ref = db.collection("competencyAnalyses").document(student_id)
analysis["updatedAt"] = firestore.SERVER_TIMESTAMP
doc_ref.set(analysis, merge=True)
logger.info(f"Stored competency analysis for {student_id}")
except Exception as e:
logger.error(f"Error storing competency analysis: {e}")
async def store_question_difficulty(question_id: str, params: Dict[str, Any]):
"""Store question IRT difficulty parameters in Firestore."""
db = _get_firestore_db()
if db is None:
logger.info(f"No Firestore; skipping difficulty storage for {question_id}")
return
try:
doc_ref = db.collection("questions").document(question_id).collection(
"difficulty_params"
).document("irt")
params["updatedAt"] = firestore.SERVER_TIMESTAMP
doc_ref.set(params, merge=True)
logger.info(f"Stored difficulty params for question {question_id}")
except Exception as e:
logger.error(f"Error storing question difficulty: {e}")
# โ”€โ”€โ”€ IRT & Statistical Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _irt_3pl_probability(theta: float, a: float, b: float, c: float = 0.25) -> float:
"""
3-Parameter Logistic IRT model.
P(correct) = c + (1 - c) / (1 + exp(-a * (theta - b)))
theta: student ability
a: discrimination
b: difficulty
c: guessing parameter
"""
exponent = -a * (theta - b)
exponent = max(-20, min(20, exponent)) # numerical stability
return c + (1 - c) / (1 + math.exp(exponent))
def _estimate_theta(responses: List[Dict[str, Any]], difficulty_params: Dict[str, Dict[str, float]]) -> float:
"""
Estimate student ability (theta) using Maximum Likelihood Estimation.
responses: list of {questionId, correct: bool}
difficulty_params: {questionId: {a, b, c}}
"""
if not responses:
return 0.0
def neg_log_likelihood(theta: float) -> float:
ll = 0.0
for r in responses:
qid = r.get("questionId", "")
params = difficulty_params.get(qid, {"a": 1.0, "b": 0.0, "c": 0.25})
p = _irt_3pl_probability(theta, params["a"], params["b"], params.get("c", 0.25))
p = max(1e-10, min(1 - 1e-10, p)) # avoid log(0)
if r.get("correct", False):
ll += math.log(p)
else:
ll += math.log(1 - p)
return -ll
result = minimize_scalar(neg_log_likelihood, bounds=(-4, 4), method="bounded")
return round(result.x, 3)
def _calculate_learning_velocity(scores_over_time: List[Tuple[float, float]]) -> float:
"""
Calculate learning velocity using weighted linear regression.
scores_over_time: list of (timestamp_as_days, score)
Returns slope (positive = improving, negative = declining).
"""
if len(scores_over_time) < 2:
return 0.0
times = np.array([t for t, _ in scores_over_time]).reshape(-1, 1)
scores = np.array([s for _, s in scores_over_time])
# Exponential decay weights (more recent = higher weight)
max_time = times.max()
decay_rate = 0.05
weights = np.exp(-decay_rate * (max_time - times.flatten()))
weights = weights / weights.sum()
# Weighted linear regression
model = LinearRegression()
model.fit(times, scores, sample_weight=weights)
return round(float(model.coef_[0]), 4)
def _calculate_efficiency_score(
student_times: List[float],
student_accuracies: List[bool],
class_avg_time: float,
attempt_counts: List[int],
) -> float:
"""
Efficiency = (class_avg_time / student_time) * accuracy_multiplier * 100
Penalise multiple attempts.
"""
if not student_times or class_avg_time <= 0:
return 50.0
efficiencies = []
for t, correct, attempts in zip(student_times, student_accuracies, attempt_counts):
if t <= 0:
t = 1.0
time_ratio = class_avg_time / t
accuracy_mult = 1.0 if correct else 0.3
attempt_penalty = 1.0 / max(attempts, 1)
eff = time_ratio * accuracy_mult * attempt_penalty * 100
efficiencies.append(min(eff, 150)) # cap at 150 to avoid outliers
raw = sum(efficiencies) / len(efficiencies)
return round(min(max(raw, 0), 100), 2)
def _get_competency_level(score: float) -> str:
"""Map a score (0-100) to competency level."""
for level, (low, high) in COMPETENCY_THRESHOLDS.items():
if low <= score < high:
return level
return "advanced" if score >= 85 else "beginner"
# โ”€โ”€โ”€ Competency Assessment System โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def compute_competency_analysis(
student_id: str,
quiz_history: List[Dict[str, Any]],
topic_filter: Optional[str] = None,
) -> CompetencyAnalysisResponse:
"""
Full competency analysis using IRT approach.
"""
if not quiz_history or len(quiz_history) < MIN_QUIZ_ATTEMPTS_FOR_COMPETENCY:
return CompetencyAnalysisResponse(
studentId=student_id,
status="insufficient_data",
analyses=[],
overallCompetency=None,
thetaEstimate=None,
)
# Group by topic
topic_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for entry in quiz_history:
topic = entry.get("topicId") or entry.get("topic") or "Unknown"
if topic_filter and topic != topic_filter:
continue
topic_data[topic].append(entry)
if not topic_data:
return CompetencyAnalysisResponse(
studentId=student_id,
status="insufficient_data",
analyses=[],
)
# Build difficulty params from class-wide success rates
difficulty_params: Dict[str, Dict[str, float]] = {}
all_responses_for_irt: List[Dict[str, Any]] = []
for topic, entries in topic_data.items():
for entry in entries:
qid = entry.get("questionId", entry.get("id", f"{topic}_{len(all_responses_for_irt)}"))
correct = entry.get("correct", False)
if isinstance(correct, (int, float)):
correct = correct > 0.5
score = entry.get("score", 0)
total = entry.get("total", 1)
if not isinstance(correct, bool) and total > 0:
correct = (score / total) >= 0.5
all_responses_for_irt.append({"questionId": qid, "correct": correct})
# Estimate difficulty from success rate across the dataset
if qid not in difficulty_params:
difficulty_params[qid] = {"a": 1.0, "b": 0.0, "c": 0.25}
# Estimate theta
theta = _estimate_theta(all_responses_for_irt, difficulty_params)
# Per-topic analysis
analyses: List[CompetencyAnalysis] = []
for topic, entries in topic_data.items():
topic_name = topic.replace("_", " ").title()
# Accuracy
correct_count = 0
total_count = 0
first_attempt_correct = 0
first_attempt_total = 0
times: List[float] = []
accuracies: List[bool] = []
attempt_counts: List[int] = []
scores_over_time: List[Tuple[float, float]] = []
for entry in entries:
total_count += 1
score = entry.get("score", 0)
total = max(entry.get("total", 1), 1)
pct = (score / total) * 100
correct = pct >= 50
if correct:
correct_count += 1
attempts = entry.get("attempts", 1)
if attempts <= 1 and correct:
first_attempt_correct += 1
first_attempt_total += 1
time_spent = entry.get("timeTaken") or entry.get("timeSpent") or 60
times.append(float(time_spent))
accuracies.append(correct)
attempt_counts.append(max(attempts, 1))
# Timestamp for velocity
ts = entry.get("completedAt") or entry.get("timestamp") or entry.get("date")
if ts:
if isinstance(ts, (int, float)):
day_val = ts / 86400
elif hasattr(ts, "seconds"):
day_val = ts.seconds / 86400
elif isinstance(ts, datetime):
day_val = ts.timestamp() / 86400
elif isinstance(ts, str):
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
day_val = dt.timestamp() / 86400
except Exception:
day_val = time.time() / 86400
else:
day_val = time.time() / 86400
scores_over_time.append((day_val, pct))
avg_accuracy = (correct_count / max(total_count, 1)) * 100
mastery_pct = (first_attempt_correct / max(first_attempt_total, 1)) * 100
# Class average time (use all entries as proxy)
class_avg_time = np.mean(times) if times else 60.0
efficiency = _calculate_efficiency_score(times, accuracies, class_avg_time, attempt_counts)
velocity = _calculate_learning_velocity(scores_over_time)
competency_level = _get_competency_level(avg_accuracy)
# Last attempt date
last_date = None
if scores_over_time:
last_ts = max(t for t, _ in scores_over_time)
last_date = datetime.utcfromtimestamp(last_ts * 86400).isoformat()
analyses.append(CompetencyAnalysis(
topicId=topic,
topicName=topic_name,
efficiencyScore=efficiency,
competencyLevel=competency_level,
masteryPercentage=round(mastery_pct, 2),
learningVelocity=velocity,
totalAttempts=total_count,
averageAccuracy=round(avg_accuracy, 2),
lastAttemptDate=last_date,
))
# Sort by efficiency score ascending (weakest first)
analyses.sort(key=lambda a: a.efficiencyScore)
# Overall competency
if analyses:
avg_eff = sum(a.efficiencyScore for a in analyses) / len(analyses)
overall = _get_competency_level(avg_eff)
else:
overall = None
return CompetencyAnalysisResponse(
studentId=student_id,
status="success",
analyses=analyses,
overallCompetency=overall,
thetaEstimate=theta,
)
# โ”€โ”€โ”€ Enhanced Risk Prediction โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _build_risk_features(data: EnhancedRiskRequest) -> np.ndarray:
"""Build feature vector for risk prediction."""
features = [
data.engagementScore,
data.avgQuizScore,
data.attendance,
data.assignmentCompletion,
data.streak or 0,
data.xpGrowthRate or 0.0,
data.timeOnPlatform or 0.0,
data.engagementTrend7d or 0.0,
data.quizScoreVariance or 0.0,
data.consecutiveAbsences or 0,
data.daysSinceLastActivity or 0,
]
return np.array(features).reshape(1, -1)
RISK_FEATURE_NAMES = [
"engagementScore",
"avgQuizScore",
"attendance",
"assignmentCompletion",
"streak",
"xpGrowthRate",
"timeOnPlatform",
"engagementTrend7d",
"quizScoreVariance",
"consecutiveAbsences",
"daysSinceLastActivity",
]
def _load_risk_model():
"""Load trained risk model from disk."""
if not HAS_JOBLIB:
return None
cache_key = "risk_model"
cached = _risk_model_cache.get(cache_key)
if cached is not None:
return cached
if os.path.exists(RISK_MODEL_PATH):
try:
model = joblib.load(RISK_MODEL_PATH)
_risk_model_cache[cache_key] = model
logger.info("Loaded trained risk model from disk")
return model
except Exception as e:
logger.error(f"Error loading risk model: {e}")
return None
def _rule_based_risk(data: EnhancedRiskRequest) -> EnhancedRiskPrediction:
"""Fallback rule-based risk prediction when no ML model is available."""
score = (
data.engagementScore * 0.25
+ data.avgQuizScore * 0.30
+ data.attendance * 0.25
+ data.assignmentCompletion * 0.20
)
# Penalties
if (data.consecutiveAbsences or 0) >= 3:
score -= 10
if (data.daysSinceLastActivity or 0) >= 7:
score -= 10
if (data.streak or 0) == 0:
score -= 5
# Bonuses
if (data.streak or 0) >= 7:
score += 5
if (data.engagementTrend7d or 0) > 0:
score += 5
score = max(0, min(100, score))
if score >= 70:
risk_level = "Low"
probs = {"High": 0.05, "Medium": 0.15, "Low": 0.80}
elif score >= 45:
risk_level = "Medium"
probs = {"High": 0.15, "Medium": 0.55, "Low": 0.30}
else:
risk_level = "High"
probs = {"High": 0.70, "Medium": 0.20, "Low": 0.10}
factors = []
if data.avgQuizScore < 50:
factors.append({"feature": "avgQuizScore", "impact": -0.3, "detail": "Low quiz scores"})
if data.attendance < 60:
factors.append({"feature": "attendance", "impact": -0.25, "detail": "Poor attendance"})
if data.engagementScore < 40:
factors.append({"feature": "engagementScore", "impact": -0.2, "detail": "Low engagement"})
if (data.consecutiveAbsences or 0) >= 3:
factors.append({"feature": "consecutiveAbsences", "impact": -0.15, "detail": "Multiple consecutive absences"})
if data.assignmentCompletion < 50:
factors.append({"feature": "assignmentCompletion", "impact": -0.2, "detail": "Low assignment completion"})
if not factors:
factors.append({"feature": "overall", "impact": 0.0, "detail": "No major risk factors identified"})
recommendations = []
if risk_level == "High":
recommendations = [
"Schedule immediate one-on-one check-in with student",
"Set up tutoring sessions for weak subjects",
"Contact parent/guardian about academic concerns",
"Create a structured study plan with daily goals",
]
elif risk_level == "Medium":
recommendations = [
"Monitor progress closely over next 2 weeks",
"Encourage participation in study groups",
"Assign additional practice exercises for weak areas",
]
else:
recommendations = [
"Continue current learning approach",
"Challenge with advanced material when ready",
]
return EnhancedRiskPrediction(
riskLevel=risk_level,
confidence=round(max(probs.values()), 3),
probabilities=probs,
contributingFactors=factors[:3],
recommendations=recommendations,
modelUsed="rule_based",
)
async def predict_risk_enhanced(data: EnhancedRiskRequest) -> EnhancedRiskPrediction:
"""Enhanced risk prediction using trained ML model with SHAP explanations."""
model = _load_risk_model()
if model is None:
logger.info("No trained ML model found; using rule-based risk prediction")
return _rule_based_risk(data)
try:
features = _build_risk_features(data)
label_map = {0: "High", 1: "Medium", 2: "Low"}
# Predict
prediction = model.predict(features)[0]
probabilities_raw = model.predict_proba(features)[0]
risk_level = label_map.get(int(prediction), "Medium")
probs = {}
for i, label in label_map.items():
if i < len(probabilities_raw):
probs[label] = round(float(probabilities_raw[i]), 4)
else:
probs[label] = 0.0
confidence = round(float(max(probabilities_raw)), 4)
# SHAP explanations
factors = []
if HAS_SHAP:
try:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(features)
if isinstance(shap_values, list):
# Multi-class: use SHAP values for predicted class
sv = shap_values[int(prediction)][0]
else:
sv = shap_values[0]
# Get top 3 contributing features
feature_impacts = list(zip(RISK_FEATURE_NAMES, sv))
feature_impacts.sort(key=lambda x: abs(x[1]), reverse=True)
for fname, impact in feature_impacts[:3]:
idx = RISK_FEATURE_NAMES.index(fname)
fval = features[0][idx]
factors.append({
"feature": fname,
"impact": round(float(impact), 4),
"value": round(float(fval), 2),
"detail": f"{fname} = {fval:.1f} (SHAP impact: {impact:.3f})",
})
except Exception as e:
logger.warning(f"SHAP explanation failed: {e}")
factors = [{"feature": "model_prediction", "impact": 0.0, "detail": "SHAP unavailable"}]
else:
# Feature importance fallback
if hasattr(model, "feature_importances_"):
importances = model.feature_importances_
fi = list(zip(RISK_FEATURE_NAMES, importances))
fi.sort(key=lambda x: x[1], reverse=True)
for fname, imp in fi[:3]:
idx = RISK_FEATURE_NAMES.index(fname)
fval = features[0][idx]
factors.append({
"feature": fname,
"impact": round(float(imp), 4),
"value": round(float(fval), 2),
"detail": f"{fname} = {fval:.1f} (importance: {imp:.3f})",
})
# Recommendations based on prediction
if risk_level == "High":
recommendations = [
"Immediate intervention recommended โ€” schedule one-on-one session",
"Review recent quiz performance for specific skill gaps",
"Contact parent/guardian about academic concerns",
"Create personalised remediation plan",
]
elif risk_level == "Medium":
recommendations = [
"Monitor student progress more frequently",
"Assign targeted practice for weak areas",
"Encourage peer study groups",
]
else:
recommendations = [
"Student is performing well โ€” maintain current pace",
"Consider enrichment activities for advanced topics",
]
return EnhancedRiskPrediction(
riskLevel=risk_level,
confidence=confidence,
probabilities=probs,
contributingFactors=factors,
recommendations=recommendations,
modelUsed="ml_model",
)
except Exception as e:
logger.error(f"ML risk prediction failed: {e}\n{traceback.format_exc()}")
logger.info("Falling back to rule-based prediction")
return _rule_based_risk(data)
async def train_risk_model(force_retrain: bool = False) -> RiskTrainResponse:
"""
Train a risk classification model on historical student data.
Tries XGBoost first, falls back to Random Forest.
"""
if not HAS_JOBLIB:
raise ValueError("joblib not installed; cannot save model")
# Check if model exists and skip unless forced
if os.path.exists(RISK_MODEL_PATH) and not force_retrain:
return RiskTrainResponse(
status="model_exists",
accuracy=0.0,
precision=0.0,
recall=0.0,
f1Score=0.0,
samplesUsed=0,
modelPath=RISK_MODEL_PATH,
)
# Fetch historical data from Firestore
db = _get_firestore_db()
X_data = []
y_data = []
if db is not None:
try:
users_ref = db.collection("users").where("role", "==", "student").limit(500)
user_docs = users_ref.stream()
for doc in user_docs:
data = doc.to_dict()
if not data:
continue
features = [
data.get("engagementScore", 50),
data.get("avgQuizScore", 50),
data.get("attendance", 80),
data.get("assignmentCompletion", 60),
data.get("streak", 0),
data.get("xpGrowthRate", 0),
data.get("timeOnPlatform", 0),
0.0, # engagementTrend7d
0.0, # quizScoreVariance
data.get("consecutiveAbsences", 0),
data.get("daysSinceLastActivity", 0),
]
X_data.append(features)
# Determine label from existing riskLevel or compute it
risk = data.get("riskLevel", "")
if risk == "High":
y_data.append(0)
elif risk == "Medium":
y_data.append(1)
else:
y_data.append(2)
except Exception as e:
logger.error(f"Error fetching training data: {e}")
# If insufficient real data, generate synthetic training data
if len(X_data) < 50:
logger.info("Insufficient Firestore data; generating synthetic training data")
synth_X, synth_y = _generate_synthetic_risk_data(500)
X_data.extend(synth_X.tolist())
y_data.extend(synth_y.tolist())
X = np.array(X_data)
y = np.array(y_data)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
# Train model
if HAS_XGBOOST:
model = xgb.XGBClassifier(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
objective="multi:softprob",
num_class=3,
eval_metric="mlogloss",
random_state=42,
use_label_encoder=False,
)
logger.info("Training XGBoost risk classifier")
else:
model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42,
class_weight="balanced",
)
logger.info("Training Random Forest risk classifier")
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
prec = precision_score(y_test, y_pred, average="weighted", zero_division=0)
rec = recall_score(y_test, y_pred, average="weighted", zero_division=0)
f1 = f1_score(y_test, y_pred, average="weighted", zero_division=0)
logger.info(f"Risk model trained: accuracy={acc:.3f}, F1={f1:.3f}")
logger.info(f"Classification report:\n{classification_report(y_test, y_pred, zero_division=0)}")
# Save model
os.makedirs(os.path.dirname(RISK_MODEL_PATH), exist_ok=True)
joblib.dump(model, RISK_MODEL_PATH)
logger.info(f"Risk model saved to {RISK_MODEL_PATH}")
# Clear model cache so next prediction loads new model
_risk_model_cache.clear()
return RiskTrainResponse(
status="trained",
accuracy=round(acc, 4),
precision=round(prec, 4),
recall=round(rec, 4),
f1Score=round(f1, 4),
samplesUsed=len(X_data),
modelPath=RISK_MODEL_PATH,
)
def _generate_synthetic_risk_data(n: int) -> Tuple[np.ndarray, np.ndarray]:
"""Generate synthetic student data for model training."""
np.random.seed(42)
X = []
y = []
for _ in range(n):
risk_class = np.random.choice([0, 1, 2], p=[0.2, 0.3, 0.5])
if risk_class == 0: # High risk
engagement = np.random.normal(30, 15)
quiz = np.random.normal(35, 12)
attendance = np.random.normal(50, 15)
completion = np.random.normal(35, 15)
streak = max(0, int(np.random.normal(1, 2)))
xp_growth = np.random.normal(-0.5, 0.3)
time_platform = np.random.normal(2, 1)
trend = np.random.normal(-10, 5)
variance = np.random.normal(25, 8)
absences = max(0, int(np.random.normal(4, 2)))
days_inactive = max(0, int(np.random.normal(10, 5)))
elif risk_class == 1: # Medium risk
engagement = np.random.normal(55, 12)
quiz = np.random.normal(60, 10)
attendance = np.random.normal(72, 10)
completion = np.random.normal(60, 12)
streak = max(0, int(np.random.normal(3, 3)))
xp_growth = np.random.normal(0.2, 0.3)
time_platform = np.random.normal(5, 2)
trend = np.random.normal(0, 8)
variance = np.random.normal(15, 5)
absences = max(0, int(np.random.normal(2, 1)))
days_inactive = max(0, int(np.random.normal(3, 3)))
else: # Low risk
engagement = np.random.normal(82, 10)
quiz = np.random.normal(85, 8)
attendance = np.random.normal(93, 5)
completion = np.random.normal(88, 8)
streak = max(0, int(np.random.normal(10, 5)))
xp_growth = np.random.normal(1.0, 0.4)
time_platform = np.random.normal(10, 3)
trend = np.random.normal(5, 5)
variance = np.random.normal(8, 3)
absences = 0
days_inactive = max(0, int(np.random.normal(1, 1)))
features = [
max(0, min(100, engagement)),
max(0, min(100, quiz)),
max(0, min(100, attendance)),
max(0, min(100, completion)),
streak,
xp_growth,
max(0, time_platform),
trend,
max(0, variance),
absences,
days_inactive,
]
X.append(features)
y.append(risk_class)
return np.array(X), np.array(y)
# โ”€โ”€โ”€ Quiz Difficulty Calibration โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def calibrate_question_difficulty(request: CalibrateDifficultyRequest) -> CalibrateDifficultyResponse:
"""
Calculate IRT difficulty parameters for a question based on student responses.
"""
responses = request.studentResponses
if not responses:
raise ValueError("No student responses provided")
correct_count = sum(1 for r in responses if r.get("correct", False))
total = len(responses)
success_rate = correct_count / total
# Difficulty parameter b = logit(1 - p_correct)
p = max(0.01, min(0.99, success_rate)) # clamp to avoid infinity
b = round(math.log((1 - p) / p), 3)
# Discrimination parameter a
# Split students into high and low performers by time
if len(responses) >= 4:
times = [r.get("timeSpent", 60) for r in responses]
median_time = sorted(times)[len(times) // 2]
fast_correct = sum(1 for r in responses if r.get("correct") and r.get("timeSpent", 60) <= median_time)
fast_total = sum(1 for r in responses if r.get("timeSpent", 60) <= median_time)
slow_correct = sum(1 for r in responses if r.get("correct") and r.get("timeSpent", 60) > median_time)
slow_total = sum(1 for r in responses if r.get("timeSpent", 60) > median_time)
p_fast = fast_correct / max(fast_total, 1)
p_slow = slow_correct / max(slow_total, 1)
# Higher discrimination if fast students do much better
a = round(max(0.3, min(3.0, (p_fast - p_slow) * 3 + 1.0)), 3)
else:
a = 1.0
# Guessing parameter c (based on question type; default 0.25 for 4-choice)
c = 0.25
# Difficulty label
if b < -1.0:
diff_label = "easy"
elif b < 1.0:
diff_label = "medium"
else:
diff_label = "hard"
# Store in Firestore
params = {
"b": b,
"a": a,
"c": c,
"difficultyLabel": diff_label,
"successRate": round(success_rate, 4),
"totalResponses": total,
}
await store_question_difficulty(request.questionId, params)
# Cache it
_cache_set(_difficulty_cache, request.questionId, params)
return CalibrateDifficultyResponse(
questionId=request.questionId,
difficultyParameter=b,
discriminationParameter=a,
guessingParameter=c,
difficultyLabel=diff_label,
totalResponses=total,
successRate=round(success_rate, 4),
)
async def select_adaptive_quiz(request: AdaptiveQuizRequest) -> AdaptiveQuizResponse:
"""
Select questions adaptively based on student ability and IRT parameters.
"""
# Get student competency for this topic
quiz_history = await fetch_student_quiz_history(request.studentId)
# Estimate student ability
topic_entries = [e for e in quiz_history if (e.get("topicId") or e.get("topic")) == request.topicId]
if topic_entries:
responses_for_irt = []
difficulty_params = {}
for i, entry in enumerate(topic_entries):
qid = entry.get("questionId", f"q_{i}")
correct = entry.get("correct", False)
if isinstance(correct, (int, float)):
correct = correct > 0.5
score = entry.get("score", 0)
total = max(entry.get("total", 1), 1)
if not isinstance(correct, bool):
correct = (score / total) >= 0.5
responses_for_irt.append({"questionId": qid, "correct": correct})
difficulty_params[qid] = {"a": 1.0, "b": 0.0, "c": 0.25}
theta = _estimate_theta(responses_for_irt, difficulty_params)
else:
theta = 0.0 # Default ability
competency_level = _get_competency_level((theta + 4) / 8 * 100) # normalise theta to 0-100
# Difficulty distribution based on competency
distributions = {
"beginner": {"easy": 0.70, "medium": 0.20, "hard": 0.10},
"developing": {"easy": 0.40, "medium": 0.40, "hard": 0.20},
"proficient": {"easy": 0.20, "medium": 0.40, "hard": 0.40},
"advanced": {"easy": 0.10, "medium": 0.30, "hard": 0.60},
}
dist = distributions.get(competency_level, distributions["developing"])
# Generate question selections with adaptive difficulty
n = request.numQuestions
selected: List[AdaptiveQuizSelection] = []
current_theta = theta
difficulty_counts = {"easy": 0, "medium": 0, "hard": 0}
# Calculate target counts per difficulty
target_counts = {
"easy": max(1, round(n * dist["easy"])),
"medium": max(1, round(n * dist["medium"])),
"hard": max(0, n - max(1, round(n * dist["easy"])) - max(1, round(n * dist["medium"]))),
}
for i in range(n):
# Determine difficulty for this question
if i < 2:
# Start near student's level
b = current_theta
else:
# Adaptive: alternate based on simulated performance
if i % 3 == 0:
b = current_theta - 0.5 # Slightly easier
elif i % 3 == 1:
b = current_theta
else:
b = current_theta + 0.5 # Slightly harder
# Classify difficulty
if b < -1.0:
diff_label = "easy"
elif b < 1.0:
diff_label = "medium"
else:
diff_label = "hard"
# Ensure we don't exceed target counts
if difficulty_counts[diff_label] >= target_counts[diff_label]:
# Pick the difficulty with most remaining quota
remaining = {k: target_counts[k] - difficulty_counts[k] for k in target_counts}
diff_label = max(remaining, key=lambda k: remaining[k])
if diff_label == "easy":
b = min(b, -1.0)
elif diff_label == "hard":
b = max(b, 1.0)
difficulty_counts[diff_label] += 1
# Calculate predicted success probability
predicted_p = _irt_3pl_probability(current_theta, a=1.0, b=b, c=0.25)
selected.append(AdaptiveQuizSelection(
questionId=f"{request.topicId}_q{i+1}",
estimatedDifficulty=round(b, 3),
predictedSuccessProbability=round(predicted_p, 3),
difficultyLabel=diff_label,
))
# Expected overall success rate
avg_success = sum(q.predictedSuccessProbability for q in selected) / max(len(selected), 1)
return AdaptiveQuizResponse(
studentId=request.studentId,
topicId=request.topicId,
selectedQuestions=selected,
studentAbilityEstimate=round(theta, 3),
expectedSuccessRate=round(avg_success, 3),
difficultyDistribution=difficulty_counts,
)
# โ”€โ”€โ”€ Topic Recommendation Engine โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def recommend_topics(request: TopicRecommendationRequest) -> TopicRecommendationResponse:
"""
Recommend topics based on competency gaps, prerequisites, and peer data.
"""
student_id = request.studentId
quiz_history = await fetch_student_quiz_history(student_id)
if not quiz_history:
# Cold start: recommend foundational topics
foundational = [
TopicRecommendation(
topicId="Variables & Expressions",
topicName="Variables & Expressions",
recommendationScore=95.0,
reasoning="Foundational topic essential for all algebra. Start here to build a strong base.",
estimatedTimeToMastery=3,
prerequisitesMet=True,
currentCompetency="not_attempted",
),
TopicRecommendation(
topicId="Integers",
topicName="Integers",
recommendationScore=90.0,
reasoning="Core number sense topic needed for all math areas.",
estimatedTimeToMastery=2,
prerequisitesMet=True,
currentCompetency="not_attempted",
),
TopicRecommendation(
topicId="Fractions & Decimals",
topicName="Fractions & Decimals",
recommendationScore=85.0,
reasoning="Understanding fractions is critical for algebra and calculus.",
estimatedTimeToMastery=4,
prerequisitesMet=True,
currentCompetency="not_attempted",
),
]
return TopicRecommendationResponse(
studentId=student_id,
recommendations=foundational[:request.numRecommendations],
status="cold_start",
)
# Get competency analysis
comp_result = await compute_competency_analysis(student_id, quiz_history)
dependencies = fetch_topic_dependencies()
topic_competencies: Dict[str, CompetencyAnalysis] = {}
for a in comp_result.analyses:
topic_competencies[a.topicId] = a
# Score each topic
all_topics = set()
for a in comp_result.analyses:
all_topics.add(a.topicId)
for topic, prereqs in dependencies.items():
all_topics.add(topic)
all_topics.update(prereqs)
scored_topics: List[TopicRecommendation] = []
for topic in all_topics:
comp = topic_competencies.get(topic)
current_level = comp.competencyLevel if comp else "not_attempted"
current_score = comp.averageAccuracy if comp else 0
# Skip topics already mastered
if current_level == "advanced":
continue
# 1. Weakness score (higher for weaker topics)
if current_level == "not_attempted":
weakness_score = 70
elif current_level == "beginner":
weakness_score = 100 - current_score
elif current_level == "developing":
weakness_score = 80 - current_score * 0.5
else: # proficient
weakness_score = 40 - current_score * 0.3
# 2. Prerequisite score (higher if prerequisites are met)
prereqs = dependencies.get(topic, [])
if prereqs:
prereq_scores = []
for p in prereqs:
p_comp = topic_competencies.get(p)
if p_comp:
prereq_scores.append(p_comp.averageAccuracy)
else:
prereq_scores.append(0)
prereq_avg = sum(prereq_scores) / len(prereq_scores) if prereq_scores else 0
prereqs_met = all(s >= 50 for s in prereq_scores)
else:
prereq_avg = 100 # No prereqs needed
prereqs_met = True
# 3. Recency score (boost recently attempted topics)
if comp and comp.lastAttemptDate:
try:
last_dt = datetime.fromisoformat(comp.lastAttemptDate.replace("Z", "+00:00"))
days_since = (datetime.utcnow() - last_dt.replace(tzinfo=None)).days
except Exception:
days_since = 30
else:
days_since = 30
recency_score = min(days_since, 60) # cap at 60
# 4. Combined score
total_score = (
weakness_score * 0.4
+ prereq_avg * 0.3
+ recency_score * 0.2
+ (10 if prereqs_met else 0) * 0.1
)
# Degrade score if prerequisites not met (but still recommend)
if not prereqs_met:
total_score *= 0.6
# Estimate time to mastery (hours)
if current_level == "not_attempted":
est_hours = 8
elif current_level == "beginner":
est_hours = 6
elif current_level == "developing":
est_hours = 4
else:
est_hours = 2
# Build reasoning
reasons = []
if current_level in ("beginner", "not_attempted"):
reasons.append(f"Currently at {current_level} level โ€” focused practice will build foundation")
elif current_level == "developing":
reasons.append(f"Developing competency ({current_score:.0f}% accuracy) โ€” close to proficiency with more practice")
else:
reasons.append(f"Proficient but not yet mastered ({current_score:.0f}% accuracy)")
if not prereqs_met and prereqs:
reasons.append(f"Note: prerequisites ({', '.join(prereqs)}) not fully met โ€” complete those first")
elif prereqs and prereqs_met:
reasons.append("All prerequisites are met")
if comp and comp.learningVelocity > 0:
reasons.append(f"Positive learning trend (velocity: {comp.learningVelocity:+.3f})")
elif comp and comp.learningVelocity < 0:
reasons.append(f"Declining performance detected โ€” review recommended")
if days_since > 14:
reasons.append(f"Not practiced in {days_since} days โ€” review to prevent forgetting")
scored_topics.append(TopicRecommendation(
topicId=topic,
topicName=topic.replace("_", " ").title(),
recommendationScore=round(total_score, 2),
reasoning=". ".join(reasons) + ".",
estimatedTimeToMastery=est_hours,
prerequisitesMet=prereqs_met,
currentCompetency=current_level,
))
# Sort by score descending
scored_topics.sort(key=lambda t: t.recommendationScore, reverse=True)
return TopicRecommendationResponse(
studentId=student_id,
recommendations=scored_topics[:request.numRecommendations],
status="success",
)
# โ”€โ”€โ”€ Learning Analytics Aggregation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def get_student_summary(student_id: str) -> StudentSummaryResponse:
"""Aggregate all ML metrics for a single student."""
# Check cache
cached = _cache_get(_competency_cache, f"summary_{student_id}", IRT_DIFFICULTY_CACHE_TTL)
if cached:
return cached
quiz_history = await fetch_student_quiz_history(student_id)
engagement = await fetch_student_engagement_metrics(student_id)
# Competency analysis
comp_result = await compute_competency_analysis(student_id, quiz_history)
# Competency distribution
comp_dist = {"beginner": 0, "developing": 0, "proficient": 0, "advanced": 0}
for a in comp_result.analyses:
if a.competencyLevel in comp_dist:
comp_dist[a.competencyLevel] += 1
# Efficiency scores per subject
eff_scores = {}
for a in comp_result.analyses:
eff_scores[a.topicName] = a.efficiencyScore
# Learning velocity trend (chart data)
velocity_trend = []
for a in comp_result.analyses:
velocity_trend.append({
"topic": a.topicName,
"velocity": a.learningVelocity,
"accuracy": a.averageAccuracy,
"attempts": a.totalAttempts,
})
# Topic recommendations
try:
rec_req = TopicRecommendationRequest(studentId=student_id, numRecommendations=5)
rec_result = await recommend_topics(rec_req)
recommended = [
{
"topicId": r.topicId,
"topicName": r.topicName,
"score": r.recommendationScore,
"reasoning": r.reasoning,
"prerequisitesMet": r.prerequisitesMet,
}
for r in rec_result.recommendations
]
except Exception as e:
logger.warning(f"Topic recommendation failed: {e}")
recommended = []
# Predicted next quiz score (simple linear extrapolation)
predicted_score = None
if quiz_history and len(quiz_history) >= 3:
recent_scores = []
for entry in quiz_history[-10:]:
score = entry.get("score", 0)
total = max(entry.get("total", 1), 1)
recent_scores.append((score / total) * 100)
if len(recent_scores) >= 3:
x = np.arange(len(recent_scores)).reshape(-1, 1)
y = np.array(recent_scores)
model = LinearRegression()
model.fit(x, y)
predicted_score = round(float(max(0, min(100, model.predict([[len(recent_scores)]])[0]))), 1)
# Engagement patterns
engagement_patterns = {
"dailyActivity": engagement.get("dailyActivity", {}),
"hourlyActivity": engagement.get("hourlyActivity", {}),
"activeDays": engagement.get("activeDays", 0),
"avgActivitiesPerDay": engagement.get("avgActivitiesPerDay", 0),
"totalXP": engagement.get("totalXP", 0),
}
result = StudentSummaryResponse(
studentId=student_id,
competencyDistribution=comp_dist,
riskAssessment=None,
recommendedTopics=recommended,
learningVelocityTrend=velocity_trend,
efficiencyScores=eff_scores,
predictedNextQuizScore=predicted_score,
engagementPatterns=engagement_patterns,
status="success" if comp_result.status == "success" else "limited_data",
)
# Cache the result
_cache_set(_competency_cache, f"summary_{student_id}", result)
return result
async def get_class_insights(request: ClassInsightsRequest) -> ClassInsightsResponse:
"""Aggregate class-wide ML analytics for teacher dashboards."""
cached = _cache_get(_class_stats_cache, f"class_{request.teacherId}_{request.classId}", IRT_DIFFICULTY_CACHE_TTL)
if cached:
return cached
db = _get_firestore_db()
student_ids: List[str] = []
if db is not None:
try:
if request.classId:
# Fetch students in specific class
class_ref = db.collection("classes").document(request.classId)
class_doc = class_ref.get()
if class_doc.exists:
class_data = class_doc.to_dict()
student_ids = class_data.get("studentIds", [])
else:
# Fetch all students for this teacher
user_ref = db.collection("users").where("role", "==", "student").limit(100)
for doc in user_ref.stream():
student_ids.append(doc.id)
except Exception as e:
logger.error(f"Error fetching class students: {e}")
if not student_ids:
# Generate sample data for demo
return _generate_demo_class_insights(request)
# Aggregate per-student data
risk_dist = {"High": 0, "Medium": 0, "Low": 0}
all_competencies: List[CompetencyAnalysis] = []
all_velocities: List[float] = []
interventions: List[Dict[str, Any]] = []
topic_weakness_counts: Dict[str, int] = defaultdict(int)
hourly_engagement = defaultdict(int)
for sid in student_ids[:50]: # Limit for performance
try:
summary = await get_student_summary(sid)
# Risk
if summary.riskAssessment:
level = summary.riskAssessment.get("riskLevel", "Medium")
risk_dist[level] = risk_dist.get(level, 0) + 1
# Competencies
for topic, count in summary.competencyDistribution.items():
if topic in ("beginner", "developing") and count > 0:
# Mark this as a weak area
pass
# Velocities
for vt in summary.learningVelocityTrend:
all_velocities.append(vt.get("velocity", 0))
if vt.get("velocity", 0) < -0.01:
topic_weakness_counts[vt.get("topic", "Unknown")] += 1
# Engagement
for hour_str, count in summary.engagementPatterns.get("hourlyActivity", {}).items():
hourly_engagement[int(hour_str)] += count
# Intervention needed?
total_beginner = summary.competencyDistribution.get("beginner", 0)
if total_beginner >= 2 or (summary.predictedNextQuizScore and summary.predictedNextQuizScore < 50):
interventions.append({
"studentId": sid,
"reason": "Multiple topics at beginner level" if total_beginner >= 2 else "Predicted score below 50%",
"predictedScore": summary.predictedNextQuizScore,
"recommendedAction": "Schedule one-on-one tutoring session",
})
except Exception as e:
logger.warning(f"Error processing student {sid}: {e}")
# Common weak topics
common_weak = sorted(topic_weakness_counts.items(), key=lambda x: x[1], reverse=True)[:10]
weak_topics_list = [
{"topic": t, "studentsStruggling": c, "percentageOfClass": round(c / max(len(student_ids), 1) * 100, 1)}
for t, c in common_weak
]
# Velocity distribution
if all_velocities:
vel_dist: Dict[str, float] = {
"mean": round(float(np.mean(all_velocities)), 4),
"median": round(float(np.median(all_velocities)), 4),
"improving": float(sum(1 for v in all_velocities if v > 0.01)),
"declining": float(sum(1 for v in all_velocities if v < -0.01)),
"plateaued": float(sum(1 for v in all_velocities if -0.01 <= v <= 0.01)),
}
else:
vel_dist = {"mean": 0.0, "median": 0.0, "improving": 0.0, "declining": 0.0, "plateaued": 0.0}
result = ClassInsightsResponse(
teacherId=request.teacherId,
riskDistribution=risk_dist,
riskTrend=[], # Would require historical data
commonWeakTopics=weak_topics_list,
learningVelocityDistribution=vel_dist,
engagementPatterns={"hourlyDistribution": dict(hourly_engagement)},
interventionRecommendations=interventions[:10],
successPredictions={
"classAverageExpected": round(float(np.mean([s or 60 for s in []])) if not all_velocities else 65.0, 1),
"studentsLikelyToStruggle": len(interventions),
},
totalStudents=len(student_ids),
status="success",
)
_cache_set(_class_stats_cache, f"class_{request.teacherId}_{request.classId}", result)
return result
def _generate_demo_class_insights(request: ClassInsightsRequest) -> ClassInsightsResponse:
"""Generate demo class insights when no real data is available."""
return ClassInsightsResponse(
teacherId=request.teacherId,
riskDistribution={"High": 4, "Medium": 8, "Low": 18},
riskTrend=[
{"date": "2026-02-11", "high": 5, "medium": 9, "low": 16},
{"date": "2026-02-18", "high": 4, "medium": 8, "low": 18},
],
commonWeakTopics=[
{"topic": "Quadratic Equations", "studentsStruggling": 12, "percentageOfClass": 40.0},
{"topic": "Trigonometric Ratios", "studentsStruggling": 9, "percentageOfClass": 30.0},
{"topic": "Factoring", "studentsStruggling": 7, "percentageOfClass": 23.3},
],
learningVelocityDistribution={
"mean": 0.015,
"median": 0.008,
"improving": 18,
"declining": 5,
"plateaued": 7,
},
engagementPatterns={
"hourlyDistribution": {str(h): random.randint(5, 40) for h in range(8, 22)},
"peakHour": 16,
"avgDailyActiveStudents": 22,
},
interventionRecommendations=[
{
"studentId": "demo_student_1",
"reason": "Declining performance in multiple topics",
"predictedScore": 42.5,
"recommendedAction": "Schedule one-on-one review session for Quadratic Equations",
},
{
"studentId": "demo_student_2",
"reason": "3 consecutive absences",
"predictedScore": 38.0,
"recommendedAction": "Contact parent/guardian and arrange catch-up sessions",
},
],
successPredictions={
"classAverageExpected": 72.3,
"studentsLikelyToStruggle": 4,
"studentsLikelyToExcel": 8,
},
totalStudents=30,
status="demo_data",
)
# โ”€โ”€โ”€ Mock Data Generator โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def generate_mock_student_data(
num_students: int = 30,
num_quizzes: int = 20,
seed: Optional[int] = None,
) -> Dict[str, Any]:
"""
Generate realistic mock student data for testing ML features.
Includes edge cases: perfect students, struggling students, inconsistent performers.
"""
if seed is not None:
random.seed(seed)
np.random.seed(seed)
topics = [
"Linear Equations", "Quadratic Equations", "Polynomials",
"Trigonometric Ratios", "Pythagorean Theorem", "Fractions & Decimals",
"Integers", "Probability Basics", "Angles", "Area & Perimeter",
]
students = []
all_quiz_data = []
for i in range(num_students):
student_id = f"mock_student_{i+1:03d}"
# Assign student archetype
archetype_roll = random.random()
if archetype_roll < 0.1:
archetype = "perfect"
elif archetype_roll < 0.2:
archetype = "struggling"
elif archetype_roll < 0.3:
archetype = "inconsistent"
elif archetype_roll < 0.5:
archetype = "improving"
elif archetype_roll < 0.65:
archetype = "declining"
else:
archetype = "average"
# Base metrics per archetype
archetypes = {
"perfect": {
"engagement": (90, 5), "quiz": (92, 4), "attendance": (98, 2),
"completion": (95, 3), "streak": (15, 3),
},
"struggling": {
"engagement": (25, 10), "quiz": (30, 12), "attendance": (55, 15),
"completion": (30, 12), "streak": (0, 1),
},
"inconsistent": {
"engagement": (60, 25), "quiz": (55, 25), "attendance": (70, 20),
"completion": (55, 20), "streak": (3, 5),
},
"improving": {
"engagement": (65, 10), "quiz": (60, 10), "attendance": (80, 8),
"completion": (70, 10), "streak": (7, 3),
},
"declining": {
"engagement": (50, 15), "quiz": (55, 15), "attendance": (65, 12),
"completion": (50, 15), "streak": (1, 2),
},
"average": {
"engagement": (65, 12), "quiz": (68, 10), "attendance": (82, 8),
"completion": (72, 10), "streak": (5, 3),
},
}
params = archetypes[archetype]
engagement = max(0, min(100, np.random.normal(*params["engagement"])))
avg_quiz = max(0, min(100, np.random.normal(*params["quiz"])))
attendance = max(0, min(100, np.random.normal(*params["attendance"])))
completion = max(0, min(100, np.random.normal(*params["completion"])))
streak = max(0, int(np.random.normal(*params["streak"])))
student = {
"studentId": student_id,
"name": f"Student {i+1}",
"archetype": archetype,
"engagementScore": round(engagement, 1),
"avgQuizScore": round(avg_quiz, 1),
"attendance": round(attendance, 1),
"assignmentCompletion": round(completion, 1),
"streak": streak,
"xpGrowthRate": round(np.random.normal(0.5 if archetype == "improving" else 0, 0.3), 2),
"timeOnPlatform": round(max(0, np.random.normal(8, 3)), 1),
}
students.append(student)
# Generate quiz history for this student
base_time = datetime(2025, 9, 1)
for j in range(num_quizzes):
topic = random.choice(topics)
days_offset = random.randint(0, 150)
quiz_date = base_time + timedelta(days=days_offset)
# Score based on archetype with progression
if archetype == "improving":
base_score = 40 + (j / num_quizzes) * 40
elif archetype == "declining":
base_score = 80 - (j / num_quizzes) * 35
elif archetype == "perfect":
base_score = 90
elif archetype == "struggling":
base_score = 30
elif archetype == "inconsistent":
base_score = random.choice([30, 50, 70, 90])
else: # average
base_score = 65
score = max(0, min(100, base_score + np.random.normal(0, 8)))
total_questions = random.choice([10, 15, 20])
correct = round(total_questions * score / 100)
time_per_q = max(10, np.random.normal(60 if score > 70 else 90, 20))
quiz_entry = {
"studentId": student_id,
"topicId": topic,
"topic": topic,
"score": correct,
"total": total_questions,
"correct": correct >= total_questions * 0.5,
"timeTaken": round(time_per_q * total_questions),
"timeSpent": round(time_per_q),
"attempts": random.choice([1, 1, 1, 2, 2, 3]) if score < 60 else 1,
"completedAt": quiz_date.isoformat(),
"timestamp": quiz_date.isoformat(),
"questionId": f"q_{topic.replace(' ', '_').lower()}_{j}",
}
all_quiz_data.append(quiz_entry)
return {
"students": students,
"quizHistory": all_quiz_data,
"metadata": {
"numStudents": num_students,
"numQuizzes": num_quizzes,
"archetypeDistribution": {
archetype: sum(1 for s in students if s["archetype"] == archetype)
for archetype in ["perfect", "struggling", "inconsistent", "improving", "declining", "average"]
},
"topicsCovered": topics,
"generatedAt": datetime.utcnow().isoformat(),
},
}
# โ”€โ”€โ”€ Cache Management โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def refresh_all_caches() -> RefreshCacheResponse:
"""Clear and refresh all in-memory caches."""
_competency_cache.clear()
_class_stats_cache.clear()
_difficulty_cache.clear()
_risk_model_cache.clear()
logger.info("All analytics caches cleared")
return RefreshCacheResponse(
status="caches_cleared",
cachedItems=0,
timestamp=datetime.utcnow().isoformat(),
)