pytrade-backend / highlow_forecast.py
Oviya
update prediction
8dfbce4
import numpy as np
import pandas as pd
import talib
# Optional ML imports (graceful fallback if scikit-learn is not installed)
try:
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error
_SKLEARN_AVAILABLE = True
except Exception:
ExtraTreesRegressor = None
TimeSeriesSplit = None
mean_absolute_error = None
_SKLEARN_AVAILABLE = False
# Optional: HistGradientBoostingRegressor for quantile regression
try:
from sklearn.ensemble import HistGradientBoostingRegressor
_HGBR_AVAILABLE = True
except Exception:
HistGradientBoostingRegressor = None
_HGBR_AVAILABLE = False
# --------------------- Configuration ---------------------
# Prefer quantile gradient boosting for extreme values (better for High/Low)
_USE_HGBR_QUANTILE = True # auto-fallback to ExtraTrees when unavailable
# Quantiles for high/low tails (in log-ratio space)
_Q_HIGH = 0.80 # upper-tail for High
_Q_LOW = 0.20 # lower-tail for Low
# Blend ML predictions with TA fallback (in log-return space)
# Set to 0.0 to disable blending
_BLEND_TA_WEIGHT = 0.20
# Log-ratio target winsorization to reduce outlier impact: [q_low, q_high] (ExtraTrees path)
_WINSOR_Q_LOW = 0.005
_WINSOR_Q_HIGH = 0.995
# Exponential recency weighting: larger = faster decay (0.0 to disable)
_RECENCY_DECAY = 0.003 # per-sample step
# ExtraTrees hyperparameters tuned for generalization
_ETR_PARAMS_CV = dict(
n_estimators=800,
max_depth=None,
min_samples_split=2,
min_samples_leaf=3,
max_features=0.6,
bootstrap=False,
n_jobs=-1,
random_state=42,
)
_ETR_PARAMS_FINAL = dict(
n_estimators=1200,
max_depth=None,
min_samples_split=2,
min_samples_leaf=3,
max_features=0.6,
bootstrap=False,
n_jobs=-1,
random_state=42,
)
# HistGradientBoosting hyperparameters for quantile regression
_HGBR_PARAMS = dict(
loss="quantile",
learning_rate=0.05,
max_iter=600,
max_depth=3,
max_leaf_nodes=31,
max_bins=255,
l2_regularization=0.0,
early_stopping=False, # avoid random holdout leaking time
random_state=42,
)
# In-memory per-ticker model cache (no disk I/O)
_MEM_CACHE = {} # key: ticker.upper(), value: bundle dict
# --------------------- OHLC Utilities ---------------------
def _ensure_ohlc_columns(df: pd.DataFrame) -> pd.DataFrame:
cols = {c.lower(): c for c in df.columns}
need = ["open", "high", "low", "close", "volume"]
mapping = {}
for n in need:
if n in cols:
mapping[cols[n]] = n
else:
# try MultiIndex column cases from yfinance
for c in df.columns:
name = c[0].lower() if isinstance(c, tuple) and len(c) > 0 else str(c).lower()
if name == n:
mapping[c] = n
break
out = df.rename(columns=mapping).copy()
missing = [c for c in need if c not in out.columns]
if missing:
raise ValueError(f"Missing OHLCV columns after normalization: {missing}")
return out[["open", "high", "low", "close", "volume"]]
# --------------------- Business day helper ---------------------
def _next_business_days(last_date: pd.Timestamp, periods: int, exchange: str = "XNYS") -> pd.DatetimeIndex:
"""
Return next 'periods' business sessions after last_date.
Tries exchange calendar via pandas_market_calendars (holidays-aware), fallback to weekdays-only.
exchange examples: 'XNYS' (NYSE), 'XBOM' (BSE), 'XNAS' (NASDAQ), 'XNSE' (NSE).
"""
last_date = pd.Timestamp(last_date).tz_localize(None)
try:
import pandas_market_calendars as mcal
cal = mcal.get_calendar(exchange)
# buffer long enough to cover holidays
schedule = cal.schedule(start_date=last_date + pd.Timedelta(days=1),
end_date=last_date + pd.Timedelta(days=180))
sessions = schedule.index.tz_localize(None)
if len(sessions) >= periods:
return sessions[:periods]
# If for some reason not enough sessions, extend with weekday fallback
needed = periods - len(sessions)
tail = pd.bdate_range(sessions[-1] + pd.offsets.BDay(1) if len(sessions) else last_date + pd.offsets.BDay(1),
periods=needed)
return sessions.append(tail)
except Exception:
# Weekdays-only fallback
return pd.bdate_range(last_date + pd.offsets.BDay(1), periods=periods)
# --------------------- TA Heuristic (Fallback, No ML) ---------------------
def _last_finite(values: np.ndarray, default: float = np.nan) -> float:
for x in values[::-1]:
if np.isfinite(x):
return float(x)
return float(default)
def _ta_fallback_forecast(ohlc: pd.DataFrame, horizons: int = 15):
h = ohlc["high"].astype(float).values
l = ohlc["low"].astype(float).values
c = ohlc["close"].astype(float).values
if len(c) < 60:
raise ValueError("Not enough history for TA fallback (need >=60 rows).")
base_close = _last_finite(ohlc["close"].replace(0.0, np.nan).values)
if not np.isfinite(base_close) or base_close <= 0:
raise ValueError("Invalid last close after cleaning.")
atr14 = talib.ATR(h, l, c, timeperiod=14)
atr_last = _last_finite(atr14, default=np.nan)
atr_pct = (atr_last / base_close) if np.isfinite(atr_last) and base_close > 0 else np.nan
ema20 = talib.EMA(c, timeperiod=20)
ema50 = talib.EMA(c, timeperiod=50)
ema20_last = _last_finite(ema20, default=np.nan)
ema50_last = _last_finite(ema50, default=np.nan)
trend_strength = 0.0
if np.isfinite(ema20_last) and np.isfinite(ema50_last) and ema50_last > 0:
trend_strength = np.clip(ema20_last / ema50_last - 1.0, -0.05, 0.05)
ema20_slope = 0.0
if len(ema20) >= 2 and np.isfinite(ema20[-1]) and np.isfinite(ema20[-2]) and ema20[-2] > 0:
ema20_slope = np.clip((ema20[-1] / ema20[-2]) - 1.0, -0.05, 0.05)
adx14 = talib.ADX(h, l, c, timeperiod=14)
adx = _last_finite(adx14, default=20.0) / 100.0
adx = float(np.clip(adx, 0.0, 1.0))
rsi14 = talib.RSI(c, timeperiod=14)
rsi = _last_finite(rsi14, default=50.0)
tilt = float(np.clip((rsi - 50.0) / 50.0, -1.0, 1.0))
logret = np.diff(np.log(np.maximum(c, 1e-12)))
if len(logret) >= 20 and np.isfinite(logret[-20:]).sum() >= 10:
sigma20 = float(pd.Series(logret).rolling(20).std().iloc[-1])
else:
sigma20 = float(np.nan)
components = []
if np.isfinite(sigma20):
components.append(sigma20)
if np.isfinite(atr_pct):
components.append(atr_pct)
daily_vol = 0.0
if components:
daily_vol = 0.6 * components[0] + (0.4 * components[1] if len(components) > 1 else 0.0)
daily_vol = float(np.clip(daily_vol if np.isfinite(daily_vol) else 0.02, 0.004, 0.08))
drift_per_day = float(np.clip(0.5 * trend_strength + 0.5 * ema20_slope, -0.02, 0.02))
up_weight = 1.0 - 0.3 * tilt
dn_weight = 1.0 + 0.3 * tilt
up_weight = float(np.clip(up_weight, 0.5, 1.5))
dn_weight = float(np.clip(dn_weight, 0.5, 1.5))
trend_amp = 0.75 + 0.5 * adx
pred_high, pred_low = [], []
for k in range(1, horizons + 1):
amp = daily_vol * np.sqrt(k) * trend_amp
drift = drift_per_day * k
up_move = amp * up_weight
dn_move = amp * dn_weight
hi = base_close * (1.0 + drift + up_move)
lo = base_close * (1.0 + drift - dn_move)
hi = max(0.0, hi)
lo = max(0.0, lo)
if lo > hi:
lo, hi = hi, lo
pred_high.append(hi)
pred_low.append(lo)
return base_close, np.array(pred_high), np.array(pred_low)
# --------------------- Feature Engineering for ML ---------------------
def _compute_ta_features(df: pd.DataFrame) -> pd.DataFrame:
df = _ensure_ohlc_columns(df).copy()
o, h, l, c, v = [df[k].astype(float).values for k in ("open", "high", "low", "close", "volume")]
close = df["close"].astype(float)
open_ = df["open"].astype(float)
high = df["high"].astype(float)
low = df["low"].astype(float)
vol = df["volume"].astype(float)
df_feat = pd.DataFrame(index=df.index)
# Basic price features
df_feat["ret_1"] = close.pct_change(1)
df_feat["logret_1"] = np.log(close.replace(0.0, np.nan)).diff(1)
df_feat["ret_5"] = close.pct_change(5)
df_feat["ret_10"] = close.pct_change(10)
df_feat["roll_mean_5"] = close.rolling(5).mean() / close - 1.0
df_feat["roll_mean_20"] = close.rolling(20).mean() / close - 1.0
df_feat["roll_std_10"] = close.pct_change().rolling(10).std()
df_feat["range_pct"] = (high - low) / close.replace(0.0, np.nan)
# Candle features (normalized)
with np.errstate(divide="ignore", invalid="ignore"):
body = (close - open_) / close
upper_shadow = (high - np.maximum(close, open_)) / close
lower_shadow = (np.minimum(close, open_) - low) / close
df_feat["candle_body"] = body
df_feat["candle_upper"] = upper_shadow
df_feat["candle_lower"] = lower_shadow
df_feat["gap_open"] = open_.shift(0) / close.shift(1) - 1.0
# EMAs and distances
ema5 = talib.EMA(close.values, timeperiod=5)
ema20 = talib.EMA(close.values, timeperiod=20)
ema50 = talib.EMA(close.values, timeperiod=50)
with np.errstate(divide="ignore", invalid="ignore"):
df_feat["ema5_dist"] = (ema5 / close.values) - 1.0
df_feat["ema20_dist"] = (ema20 / close.values) - 1.0
df_feat["ema50_dist"] = (ema50 / close.values) - 1.0
# EMA slopes (1-day change)
df_feat["ema20_slope"] = (pd.Series(ema20, index=df.index).pct_change(1))
# RSI family
df_feat["rsi14"] = talib.RSI(close.values, timeperiod=14) / 100.0
df_feat["rsi5"] = talib.RSI(close.values, timeperiod=5) / 100.0
# MACD
macd, macdsig, macdhist = talib.MACD(close.values, fastperiod=12, slowperiod=26, signalperiod=9)
df_feat["macd"] = macd
df_feat["macdsig"] = macdsig
df_feat["macdhist"] = macdhist
# Bollinger Bands width
upper, middle, lower = talib.BBANDS(close.values, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
with np.errstate(divide="ignore", invalid="ignore"):
df_feat["bb_width"] = (upper - lower) / middle
# Volatility/Trend
atr = talib.ATR(h, l, c, timeperiod=14)
with np.errstate(divide="ignore", invalid="ignore"):
df_feat["atr14"] = atr / close.values
df_feat["adx14"] = talib.ADX(h, l, c, timeperiod=14) / 100.0
# Additional momentum/oscillators
df_feat["roc10"] = talib.ROC(close.values, timeperiod=10) / 100.0
df_feat["cci14"] = talib.CCI(h, l, c, timeperiod=14) / 100.0
df_feat["mfi14"] = talib.MFI(h, l, c, v, timeperiod=14) / 100.0
df_feat["willr14"] = talib.WILLR(h, l, c, timeperiod=14) / 100.0 # [-1, 0]
# Stochastic
slowk, slowd = talib.STOCH(h, l, c, fastk_period=14, slowk_period=3, slowk_matype=0, slowd_period=3, slowd_matype=0)
df_feat["stoch_k"] = slowk / 100.0
df_feat["stoch_d"] = slowd / 100.0
# OBV normalized (robust to missing/flat volume)
finite_vol = np.isfinite(vol.values)
if finite_vol.sum() >= max(30, int(0.5 * len(vol))):
obv = talib.OBV(close.values, vol.values)
df_feat["obv_z"] = pd.Series(obv, index=df.index).pct_change(5)
else:
df_feat["obv_z"] = 0.0
# Volume z-score and turnover proxies
vol_roll_mean = vol.rolling(20).mean()
vol_roll_std = vol.rolling(20).std()
with np.errstate(divide="ignore", invalid="ignore"):
df_feat["vol_z20"] = (vol - vol_roll_mean) / vol_roll_std
df_feat["turnover_z20"] = ((vol * close) - (vol * close).rolling(20).mean()) / (vol * close).rolling(20).std()
# Distance to rolling extremes
roll_max_20 = close.rolling(20).max()
roll_min_20 = close.rolling(20).min()
roll_max_55 = close.rolling(55).max()
roll_min_55 = close.rolling(55).min()
with np.errstate(divide="ignore", invalid="ignore"):
df_feat["dist_max20"] = roll_max_20 / close - 1.0
df_feat["dist_min20"] = close / roll_min_20 - 1.0
df_feat["dist_max55"] = roll_max_55 / close - 1.0
df_feat["dist_min55"] = close / roll_min_55 - 1.0
# Realized volatility features
logret = np.log(close.replace(0.0, np.nan)).diff(1)
df_feat["rv5"] = logret.rolling(5).std()
df_feat["rv20"] = logret.rolling(20).std()
df_feat["avg_range5"] = ((high - low) / close.replace(0.0, np.nan)).rolling(5).mean()
# Calendar (cyclical day-of-week, month-of-year)
dow = pd.Series(df.index).map(lambda d: d.weekday() if hasattr(d, "weekday") else pd.Timestamp(d).weekday())
df_feat["dow_sin"] = np.sin(2 * np.pi * dow / 7.0)
df_feat["dow_cos"] = np.cos(2 * np.pi * dow / 7.0)
moy = pd.Series(df.index).map(lambda d: (d.month if hasattr(d, "month") else pd.Timestamp(d).month))
df_feat["moy_sin"] = np.sin(2 * np.pi * (moy.astype(float) - 1.0) / 12.0)
df_feat["moy_cos"] = np.cos(2 * np.pi * (moy.astype(float) - 1.0) / 12.0)
# Lags of basic signals
df_feat["ret_1_lag1"] = df_feat["ret_1"].shift(1)
df_feat["ret_1_lag2"] = df_feat["ret_1"].shift(2)
df_feat["range_pct_lag1"] = df_feat["range_pct"].shift(1)
df_feat = df_feat.replace([np.inf, -np.inf], np.nan)
df_feat = df_feat.loc[:, df_feat.notna().any(axis=0)]
return df_feat
def _clean_features_for_training(feats: pd.DataFrame, warmup: int = 60) -> pd.DataFrame:
if feats.empty:
return feats
clean = feats.copy()
clean = clean.fillna(method="ffill").fillna(method="bfill")
if len(clean) > warmup:
clean = clean.iloc[warmup:]
clean = clean.dropna()
return clean
def _winsorize_targets(Y: np.ndarray, horizons: int, q_low: float, q_high: float) -> tuple[np.ndarray, dict]:
"""
Winsorize concatenated targets Y = [highs(0:h), lows(h:2h)] row-wise using global quantiles.
Returns clipped Y and thresholds used.
"""
h = horizons
Yh = Y[:, :h].ravel()
Yl = Y[:, h:].ravel()
lo_h, hi_h = np.quantile(Yh, [q_low, q_high]) if Yh.size else (-np.inf, np.inf)
lo_l, hi_l = np.quantile(Yl, [q_low, q_high]) if Yl.size else (-np.inf, np.inf)
Y_clip = Y.copy()
Y_clip[:, :h] = np.clip(Y_clip[:, :h], lo_h, hi_h)
Y_clip[:, h:] = np.clip(Y_clip[:, h:], lo_l, hi_l)
return Y_clip, {"high": (float(lo_h), float(hi_h)), "low": (float(lo_l), float(hi_l))}
def _sample_weights(n: int, decay: float) -> np.ndarray:
"""
Exponential recency weights. Newer samples get higher weight.
w_i = exp(-decay * (n-1-i)), i in [0..n-1]
"""
if decay <= 0 or n <= 0:
return np.ones(n, dtype=float)
idx = np.arange(n, dtype=float)
w = np.exp(-decay * (n - 1 - idx))
w /= np.average(w) # normalize to mean 1.0
return w
def _make_supervised(df: pd.DataFrame, horizons: int = 15):
"""
Build X, Y for multi-horizon high/low forecast.
Targets (log-ratio): y_high_h = log(High[t+h]/Close[t]), y_low_h = log(Low[t+h]/Close[t])
Log transform stabilizes variance and reduces skew.
"""
ohlc = _ensure_ohlc_columns(df)
feats = _compute_ta_features(df)
feat_df = _clean_features_for_training(feats, warmup=60)
# Align to cleaned feature index
ohlc = ohlc.loc[feat_df.index]
highs = ohlc["high"].astype(float).values
lows = ohlc["low"].astype(float).values
closes = ohlc["close"].astype(float).values
X_all = feat_df.values
n = len(feat_df)
if n < horizons + 30:
raise ValueError(f"Not enough rows after feature warm-up for {horizons}-day training. Have: {n}")
X_list, Y_list = [], []
for i in range(n - horizons):
base_c = closes[i]
if not np.isfinite(base_c) or base_c <= 0:
continue
future_highs = highs[i + 1:i + horizons + 1]
future_lows = lows[i + 1:i + horizons + 1]
with np.errstate(divide="ignore", invalid="ignore"):
yh = np.log(np.maximum(future_highs, 1e-12) / base_c)
yl = np.log(np.maximum(future_lows, 1e-12) / base_c)
if np.any(~np.isfinite(yh)) or np.any(~np.isfinite(yl)):
continue
X_list.append(X_all[i, :])
Y_list.append(np.concatenate([yh, yl], axis=0))
X = np.asarray(X_list)
Y = np.asarray(Y_list)
if X.size == 0 or Y.size == 0:
raise ValueError("No valid supervised samples after cleaning. Check data quality (NaNs/zeros).")
feature_names = feat_df.columns.tolist()
return X, Y, feature_names, feat_df.index[:len(X)]
def _get_sklearn_version():
try:
import sklearn
return sklearn.__version__
except Exception:
return None
# --------------------- Model Train/Load (In-Memory Only) ---------------------
def train_or_load_highlow_15d(df: pd.DataFrame, ticker: str, horizons: int = 15):
key = ticker.upper()
if key in _MEM_CACHE:
return _MEM_CACHE[key]
# If sklearn is not available at all, keep TA fallback metadata
if not _SKLEARN_AVAILABLE:
bundle = {
"model": None,
"feature_names": None,
"horizons": horizons,
"trained_rows": int(len(df)),
"metrics": None,
"sklearn_version": None,
"ticker": key,
"model_path": None,
"winsor": None,
"blend_weight": _BLEND_TA_WEIGHT,
"transform": "logratio",
"feature_importances": None,
"algo": "NONE",
}
_MEM_CACHE[key] = bundle
return bundle
# Build supervised set
X, Y_raw, feature_names, _ = _make_supervised(df, horizons=horizons)
sw = _sample_weights(X.shape[0], _RECENCY_DECAY)
# Prefer quantile gradient boosting if available
if _USE_HGBR_QUANTILE and _HGBR_AVAILABLE and HistGradientBoostingRegressor is not None:
q_models_high, q_models_low = [], []
for k in range(horizons):
# High models (upper quantile)
mh = HistGradientBoostingRegressor(**_HGBR_PARAMS, quantile=_Q_HIGH)
mh.fit(X, Y_raw[:, k], sample_weight=sw)
q_models_high.append(mh)
# Low models (lower quantile)
ml = HistGradientBoostingRegressor(**_HGBR_PARAMS, quantile=_Q_LOW)
ml.fit(X, Y_raw[:, horizons + k], sample_weight=sw)
q_models_low.append(ml)
bundle = {
"model": None, # not used in quantile path
"q_models_high": q_models_high,
"q_models_low": q_models_low,
"feature_names": feature_names,
"horizons": horizons,
"trained_rows": int(X.shape[0]),
"metrics": None, # optional: add custom CV if desired
"sklearn_version": _get_sklearn_version(),
"ticker": key,
"model_path": None,
"winsor": None,
"blend_weight": _BLEND_TA_WEIGHT,
"transform": "logratio",
"feature_importances": None,
"algo": f"HGBR_QUANTILE(high={_Q_HIGH}, low={_Q_LOW})",
}
_MEM_CACHE[key] = bundle
return bundle
# Else fall back to ExtraTrees mean-regression (existing path)
Y_clip, winsor_info = _winsorize_targets(Y_raw, horizons, _WINSOR_Q_LOW, _WINSOR_Q_HIGH)
fold_metrics = []
feature_importances = None
if TimeSeriesSplit is not None:
tscv = TimeSeriesSplit(n_splits=5)
for train_idx, val_idx in tscv.split(X):
Xtr, Xvl = X[train_idx], X[val_idx]
Ytr_clipped = Y_clip[train_idx]
Yvl_true = Y_raw[val_idx] # evaluate on true (unclipped) targets
w_tr = sw[train_idx] if sw is not None else None
model_cv = ExtraTreesRegressor(**_ETR_PARAMS_CV)
model_cv.fit(Xtr, Ytr_clipped, sample_weight=w_tr)
Yhat = model_cv.predict(Xvl)
# Convert log-ratio back to percentage move for reporting
h = horizons
if mean_absolute_error is not None:
yh_pct = (np.exp(Yvl_true[:, :h]) - 1.0) * 100.0
yl_pct = (np.exp(Yvl_true[:, h:]) - 1.0) * 100.0
yhat_h_pct = (np.exp(Yhat[:, :h]) - 1.0) * 100.0
yhat_l_pct = (np.exp(Yhat[:, h:]) - 1.0) * 100.0
high_mae = mean_absolute_error(yh_pct, yhat_h_pct)
low_mae = mean_absolute_error(yl_pct, yhat_l_pct)
fold_metrics.append({"high_mae_pct": round(float(high_mae), 4),
"low_mae_pct": round(float(low_mae), 4)})
final_model = ExtraTreesRegressor(**_ETR_PARAMS_FINAL)
final_model.fit(X, Y_clip, sample_weight=sw)
try:
fi = final_model.feature_importances_
feature_importances = sorted(
zip(feature_names, fi),
key=lambda t: t[1],
reverse=True
)[:30]
feature_importances = [(str(n), float(v)) for n, v in feature_importances]
except Exception:
feature_importances = None
bundle = {
"model": final_model,
"feature_names": feature_names,
"horizons": horizons,
"trained_rows": int(X.shape[0]),
"metrics": fold_metrics or None,
"sklearn_version": _get_sklearn_version(),
"ticker": key,
"model_path": None,
"winsor": winsor_info,
"blend_weight": _BLEND_TA_WEIGHT,
"transform": "logratio",
"feature_importances": feature_importances,
"algo": "EXTRATREES_MEAN",
}
_MEM_CACHE[key] = bundle
return bundle
# --------------------- Forecast ---------------------
def forecast_next_15_high_low(ticker: str, stock_data: pd.DataFrame):
"""
Train/load from memory and forecast next 15 business days' High/Low.
If no ML available or insufficient data, uses TA fallback.
Returns dict: dates, pred_high, pred_low, base_close, bundle_meta
"""
if not isinstance(stock_data.index, pd.DatetimeIndex):
stock_data = stock_data.copy()
stock_data.index = pd.to_datetime(stock_data.index)
ohlc = _ensure_ohlc_columns(stock_data)
try:
bundle = train_or_load_highlow_15d(stock_data, ticker, horizons=15)
model = bundle.get("model", None)
horizons = bundle.get("horizons", 15)
# Build latest feature row
feats_full = _compute_ta_features(stock_data)
feats_full = feats_full.replace([np.inf, -np.inf], np.nan)
feats_full = feats_full.loc[:, feats_full.notna().any(axis=0)]
feats_full = feats_full.fillna(method="ffill").fillna(method="bfill")
if len(feats_full) > 60:
feats_full = feats_full.iloc[60:]
if feats_full.empty:
raise ValueError("No features available for inference after cleaning.")
feature_names = bundle["feature_names"]
for col in feature_names:
if col not in feats_full.columns:
feats_full[col] = 0.0
feats_full = feats_full[feature_names]
X_t = feats_full.iloc[[-1]].values
base_close = float(ohlc.iloc[-1]["close"])
if not np.isfinite(base_close) or base_close <= 0:
base_close = float(ohlc["close"].replace(0.0, np.nan).dropna().iloc[-1])
y_pred_log = None
# Path 1: ExtraTrees multi-output mean-regression
if model is not None:
y_pred_log = model.predict(X_t).reshape(-1)
# Path 2: Quantile gradient boosting per-horizon
elif "q_models_high" in bundle and "q_models_low" in bundle:
qh = bundle["q_models_high"]
ql = bundle["q_models_low"]
yh = np.array([qh[k].predict(X_t)[0] for k in range(horizons)], dtype=float)
yl = np.array([ql[k].predict(X_t)[0] for k in range(horizons)], dtype=float)
y_pred_log = np.concatenate([yh, yl], axis=0)
if y_pred_log is not None:
# Optional hybrid blend with TA fallback in log space for stability
blend_w = float(bundle.get("blend_weight", _BLEND_TA_WEIGHT) or 0.0)
if blend_w > 0.0:
try:
_, hi_ta, lo_ta = _ta_fallback_forecast(ohlc, horizons=horizons)
with np.errstate(divide="ignore", invalid="ignore"):
yh_ta_log = np.log(np.maximum(hi_ta, 1e-12) / base_close)
yl_ta_log = np.log(np.maximum(lo_ta, 1e-12) / base_close)
yh_ml_log = y_pred_log[:horizons]
yl_ml_log = y_pred_log[horizons:]
yh_blend_log = (1.0 - blend_w) * yh_ml_log + blend_w * yh_ta_log
yl_blend_log = (1.0 - blend_w) * yl_ml_log + blend_w * yl_ta_log
y_pred_log = np.concatenate([yh_blend_log, yl_blend_log], axis=0)
except Exception:
pass
# Convert back from log-ratio to price
yh = y_pred_log[:horizons]
yl = y_pred_log[horizons:]
pred_high = np.exp(yh) * base_close
pred_low = np.exp(yl) * base_close
pred_high = np.maximum(pred_high, 0.0)
pred_low = np.maximum(pred_low, 0.0)
swp = pred_low > pred_high
if np.any(swp):
tmp = pred_high.copy()
pred_high[swp] = pred_low[swp]
pred_low[swp] = tmp[swp]
last_date = feats_full.index[-1]
future_dates = _next_business_days(last_date, horizons)
date_str = [pd.Timestamp(d).strftime("%Y-%m-%d") for d in future_dates]
return {
"dates": date_str,
"pred_high": [round(float(x), 2) for x in pred_high],
"pred_low": [round(float(x), 2) for x in pred_low],
"base_close": round(float(base_close), 4),
"bundle_meta": {
"model": bundle.get("algo", "UNKNOWN"),
"trained_rows": bundle.get("trained_rows"),
"sklearn_version": bundle.get("sklearn_version"),
"metrics": bundle.get("metrics"),
"bundle_path": None,
"ticker": bundle.get("ticker"),
"winsor": bundle.get("winsor"),
"blend_weight": bundle.get("blend_weight"),
"transform": bundle.get("transform"),
"feature_importances_top30": bundle.get("feature_importances"),
"quantiles": {"high": _Q_HIGH, "low": _Q_LOW} if "q_models_high" in bundle else None,
},
}
except Exception:
pass
base_close, pred_high, pred_low = _ta_fallback_forecast(ohlc, horizons=15)
last_date = ohlc.index[-1]
future_dates = _next_business_days(last_date, 15)
date_str = [pd.Timestamp(d).strftime("%Y-%m-%d") for d in future_dates]
return {
"dates": date_str,
"pred_high": [round(float(x), 2) for x in pred_high],
"pred_low": [round(float(x), 2) for x in pred_low],
"base_close": round(float(base_close), 4),
"bundle_meta": {
"model": "TA heuristic fallback (ATR/EMA/RSI/ADX), no ML",
"trained_rows": int(len(ohlc)),
"sklearn_version": _get_sklearn_version(),
"metrics": None,
"bundle_path": None,
"ticker": ticker.upper(),
},
}