# -*- coding: utf-8 -*- from __future__ import annotations import csv, io, json, time, os from typing import Dict, List, Any, Optional from pathlib import Path from io import StringIO import requests # optional (for Wikipedia tables) try: import pandas as pd # requires: pip install pandas lxml HAS_PANDAS = True except Exception: HAS_PANDAS = False # ---------- configuration (unchanged names) ---------- UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127 Safari/537.36" REFERER = "https://www.niftyindices.com/indices/equity/broad-based-indices" TTL_SECONDS = 60 * 60 * 12 # 12h cache DEFAULT_CACHE_DIR = os.getenv("CACHE_DIR", "/data/cache") CACHE_DIR = Path(DEFAULT_CACHE_DIR if DEFAULT_CACHE_DIR else ".").expanduser() if CACHE_DIR == Path("."): CACHE_DIR = Path(__file__).with_name("cache") CACHE_DIR.mkdir(parents=True, exist_ok=True) # Official CSV endpoints for NSE indices (unchanged name) NIFTY_URLS: Dict[str, str] = { "NIFTY50": "https://www.niftyindices.com/IndexConstituent/ind_nifty50list.csv", "NIFTY100": "https://www.niftyindices.com/IndexConstituent/ind_nifty100list.csv", "NIFTY200": "https://www.niftyindices.com/IndexConstituent/ind_nifty200list.csv", "NIFTYMID100": "https://www.niftyindices.com/IndexConstituent/ind_niftymidcap100list.csv", "NIFTY500": "https://www.niftyindices.com/IndexConstituent/ind_nifty500list.csv", } # Filters payload for the UI (unchanged variable name) MARKETS: Dict[str, Dict[str, List[Dict[str, str]]]] = { "India": { "NSE (National Stock Exchange)": [ {"code": "NIFTY50", "name": "NIFTY 50"}, {"code": "NIFTY100", "name": "NIFTY 100"}, {"code": "NIFTY200", "name": "NIFTY 200"}, {"code": "NIFTYMID100", "name": "NIFTY Midcap 100"}, {"code": "NIFTY500", "name": "NIFTY 500"}, ] } } # ---------- extras (new, additive) ---------- WIKI_PAGES: Dict[str, str] = { "NASDAQ100": "https://en.wikipedia.org/wiki/NASDAQ-100", "DAX40": "https://en.wikipedia.org/wiki/DAX", "OMXS30": "https://en.wikipedia.org/wiki/OMX_Stockholm_30", } EXTRA_MARKETS: Dict[str, Dict[str, List[Dict[str, str]]]] = { "United States": { "NASDAQ": [ {"code": "NASDAQ100", "name": "NASDAQ-100"} ] }, "Germany": { "XETRA (Deutsche Börse)": [ {"code": "DAX40", "name": "DAX 40"} ] }, "Sweden": { "OMX Stockholm": [ {"code": "OMXS30", "name": "OMX Stockholm 30"} ] } } # ---------- utilities (kept original names) ---------- def http_get_text(url: str, accept: str = "text/csv,*/*") -> str: sess = requests.Session() sess.headers.update({"User-Agent": UA, "Referer": REFERER, "Accept": accept}) r = sess.get(url, timeout=30) r.raise_for_status() r.encoding = r.encoding or "utf-8" return r.text def parse_nifty_csv(text: str) -> List[Dict[str, str]]: out: List[Dict[str, str]] = [] rdr = csv.DictReader(io.StringIO(text)) for row in rdr: sym = (row.get("Symbol") or "").strip() name = (row.get("Company Name") or "").strip() if sym and name: out.append({"symbol": f"{sym}.NS", "company": name}) return out def cache_path(code: str) -> Path: return CACHE_DIR / f"{code.lower()}.json" def load_cache(code: str) -> Any | None: fp = cache_path(code) if not fp.exists(): return None age = time.time() - fp.stat().st_mtime if age > TTL_SECONDS: return None with fp.open("r", encoding="utf-8") as f: return json.load(f) def save_cache(code: str, payload: Any) -> None: fp = cache_path(code) with fp.open("w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) def _now_iso_utc() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) # ---- Wikipedia helpers ---- def _fetch_wiki_tables(url: str): if not HAS_PANDAS: raise RuntimeError("pandas/lxml not installed. Run: pip install pandas lxml") html = http_get_text(url, accept="text/html,*/*") return pd.read_html(StringIO(html)) def _pick_table_and_columns(tables, ticker_candidates, company_candidates): for t in tables: cols_map = {str(c).strip().lower(): c for c in t.columns} ticker_col = next((cols_map[c] for c in ticker_candidates if c in cols_map), None) company_col = next((cols_map[c] for c in company_candidates if c in cols_map), None) if ticker_col is not None and company_col is not None: return t, ticker_col, company_col raise RuntimeError( f"No suitable table found. Ticker in {ticker_candidates}, company in {company_candidates}." ) def _parse_wiki_constituents(url: str, ticker_candidates, company_candidates, suffix: str, upper_tickers: bool) -> List[Dict[str, str]]: tables = _fetch_wiki_tables(url) df, t_col, c_col = _pick_table_and_columns(tables, ticker_candidates, company_candidates) rows: List[Dict[str, str]] = [] for sym, name in zip(df[t_col], df[c_col]): s = str(sym).strip() n = str(name).strip() if not s or not n: continue if upper_tickers: s = s.upper() rows.append({"symbol": f"{s}{suffix}", "company": n}) if not rows: raise RuntimeError("Parsed zero rows from Wikipedia table.") return rows def _parse_nasdaq100(): url = WIKI_PAGES["NASDAQ100"] rows = _parse_wiki_constituents( url, ticker_candidates=["ticker", "symbol"], company_candidates=["company", "name"], suffix="", upper_tickers=True, ) return rows, "NASDAQ", "US", "USD", url def _parse_dax40(): url = WIKI_PAGES["DAX40"] rows = _parse_wiki_constituents( url, ticker_candidates=["ticker symbol", "ticker", "symbol"], company_candidates=["company", "name"], suffix=".DE", upper_tickers=True, ) return rows, "XETRA", "DE", "EUR", url def _parse_omxs30(): url = WIKI_PAGES["OMXS30"] rows = _parse_wiki_constituents( url, ticker_candidates=["ticker", "symbol"], company_candidates=["company", "name"], suffix=".ST", upper_tickers=True, ) return rows, "OMX Stockholm", "SE", "SEK", url # ---------- public helpers ---------- def get_markets() -> Dict[str, Dict[str, List[Dict[str, str]]]]: """ Return filters structure for UI. Does not mutate MARKETS; returns MARKETS + EXTRA_MARKETS merged. """ # FIX: removed an extra ']' here merged: Dict[str, Dict[str, List[Dict[str, str]]]] = {} # deep copy MARKETS for country, exchanges in MARKETS.items(): merged[country] = {ex: refs[:] for ex, refs in exchanges.items()} # merge extras for country, exchanges in EXTRA_MARKETS.items(): merged.setdefault(country, {}) for ex, refs in exchanges.items(): merged[country].setdefault(ex, []) merged[country][ex].extend(refs) return merged def _all_supported_index_codes(markets: Dict[str, Dict[str, List[Dict[str, str]]]]) -> List[str]: codes: List[str] = [] for _country, exchanges in markets.items(): for _exch, refs in exchanges.items(): for ref in refs: codes.append(ref["code"]) return codes def _index_display_name(code: str, markets: Dict[str, Dict[str, List[Dict[str, str]]]]) -> str: cu = code.upper() for _country, exchanges in markets.items(): for _exch, refs in exchanges.items(): for ref in refs: if ref["code"].upper() == cu: return ref.get("name", cu) return cu def search_companies(q: str, indices: Optional[List[str]] = None, limit: int = 50) -> List[Dict[str, Any]]: """ Global search across supported indices (cached via build_companies_payload). Returns items: {symbol, company, indexCode, indexName, exchange, country} """ q_norm = (q or "").strip().lower() if not q_norm: return [] markets = get_markets() index_codes = indices or _all_supported_index_codes(markets) results: List[Dict[str, Any]] = [] for code in index_codes: try: payload = build_companies_payload(code) except Exception: continue idx_name = _index_display_name(code, markets) for row in payload.get("constituents", []): sym = str(row.get("symbol", "")).strip() com = str(row.get("company", "")).strip() if not sym or not com: continue if q_norm in sym.lower() or q_norm in com.lower(): results.append({ "symbol": sym, "company": com, "indexCode": payload.get("code"), "indexName": idx_name, "exchange": payload.get("exchange"), "country": payload.get("country"), }) if len(results) >= limit: break if len(results) >= limit: break def rank(item): sym, com = item["symbol"].lower(), item["company"].lower() if sym == q_norm or com == q_norm: return 0 if sym.startswith(q_norm) or com.startswith(q_norm): return 1 return 2 results.sort(key=rank) return results[:limit] # ---------- core (unchanged name, extended) ---------- def build_companies_payload(code: str) -> Dict[str, Any]: code = (code or "").upper().strip() if not code: raise ValueError("Index code is required.") cached = load_cache(code) if cached: return cached if code in NIFTY_URLS: url = NIFTY_URLS[code] text = http_get_text(url) rows = parse_nifty_csv(text) exchange, country, currency, source = "NSE", "IN", "INR", url elif code == "NASDAQ100": rows, exchange, country, currency, source = _parse_nasdaq100() elif code == "DAX40": rows, exchange, country, currency, source = _parse_dax40() elif code == "OMXS30": rows, exchange, country, currency, source = _parse_omxs30() else: raise ValueError(f"Unknown index code: {code}") payload = { "code": code, "exchange": exchange, "country": country, "currency": currency, "asOf": _now_iso_utc(), "count": len(rows), "constituents": rows, "source": source, } save_cache(code, payload) return payload