|
|
|
|
|
""" |
|
|
COMPLETE - All scraping tools and utilities for Roger platform |
|
|
Updated: |
|
|
- Fixed Playwright Syntax Error (removed invalid 'request_timeout'). |
|
|
- Added 'Requests-First' strategy for 10x faster scraping. |
|
|
- Added 'Rainfall' PDF detection for district-level rain data. |
|
|
- Captures ALL district/city rows from the forecast table. |
|
|
""" |
|
|
from urllib.parse import quote |
|
|
from datetime import datetime, timezone |
|
|
from typing import Optional, List, Dict, Any |
|
|
import os |
|
|
import logging |
|
|
import requests |
|
|
import json |
|
|
import io |
|
|
from langchain_core.tools import tool |
|
|
from bs4 import BeautifulSoup |
|
|
from urllib.parse import quote_plus, urljoin, urlparse |
|
|
import yfinance as yf |
|
|
import re |
|
|
import time |
|
|
import random |
|
|
|
|
|
|
|
|
def utc_now() -> datetime: |
|
|
"""Return current UTC time (Python 3.12+ compatible).""" |
|
|
return datetime.now(timezone.utc) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from playwright.sync_api import ( |
|
|
sync_playwright, |
|
|
TimeoutError as PlaywrightTimeoutError, |
|
|
) |
|
|
|
|
|
PLAYWRIGHT_AVAILABLE = True |
|
|
except Exception: |
|
|
PLAYWRIGHT_AVAILABLE = False |
|
|
|
|
|
|
|
|
try: |
|
|
from pypdf import PdfReader |
|
|
|
|
|
PDF_AVAILABLE = True |
|
|
except ImportError: |
|
|
PDF_AVAILABLE = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_HEADERS = { |
|
|
"User-Agent": ( |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) " |
|
|
"Chrome/120.0.0.0 Safari/537.36" |
|
|
), |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
|
|
"Accept-Language": "en-US,en;q=0.9", |
|
|
} |
|
|
|
|
|
DEFAULT_TIMEOUT = int(os.getenv("DEFAULT_TIMEOUT", "30")) |
|
|
MAX_RETRIES = int(os.getenv("RETRY_ATTEMPTS", "3")) |
|
|
|
|
|
|
|
|
SITE_TIMEOUTS = { |
|
|
"ft.lk": 45, |
|
|
"gazette.lk": 40, |
|
|
"meteo.gov.lk": 60, |
|
|
"parliament.lk": 40, |
|
|
} |
|
|
|
|
|
logger = logging.getLogger("Roger.utils") |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_today_str() -> str: |
|
|
return datetime.now().strftime("%a %b %d, %Y") |
|
|
|
|
|
|
|
|
def _get_site_timeout(url: str) -> int: |
|
|
"""Get site-specific timeout based on URL domain.""" |
|
|
for domain, timeout in SITE_TIMEOUTS.items(): |
|
|
if domain in url: |
|
|
return timeout |
|
|
return DEFAULT_TIMEOUT |
|
|
|
|
|
|
|
|
def _safe_get( |
|
|
url: str, timeout: int = None, headers: Optional[Dict[str, str]] = None |
|
|
) -> Optional[requests.Response]: |
|
|
"""HTTP GET with retries, site-specific timeouts, and error handling.""" |
|
|
headers = headers or DEFAULT_HEADERS |
|
|
|
|
|
if timeout is None: |
|
|
timeout = _get_site_timeout(url) |
|
|
|
|
|
for attempt in range(MAX_RETRIES): |
|
|
try: |
|
|
resp = requests.get(url, headers=headers, timeout=timeout) |
|
|
if resp.status_code == 200: |
|
|
return resp |
|
|
logger.warning(f"[HTTP] {url} returned {resp.status_code}") |
|
|
except requests.exceptions.Timeout: |
|
|
logger.warning( |
|
|
f"[HTTP] Timeout on {url} (attempt {attempt + 1}/{MAX_RETRIES}, timeout={timeout}s)" |
|
|
) |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"[HTTP] Error fetching {url}: {e}") |
|
|
if attempt < MAX_RETRIES - 1: |
|
|
time.sleep(2**attempt) |
|
|
return None |
|
|
|
|
|
|
|
|
def _contains_keyword(text: str, keywords: Optional[List[str]]) -> bool: |
|
|
if not keywords: |
|
|
return True |
|
|
text_lower = (text or "").lower() |
|
|
return any(k.lower() in text_lower for k in keywords) |
|
|
|
|
|
|
|
|
def _extract_text_from_html(html: str, selector: str = "body") -> str: |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
element = soup.select_one(selector) or soup.body |
|
|
return element.get_text(separator="\n", strip=True) if element else "" |
|
|
|
|
|
|
|
|
def _make_absolute(href: str, base: str) -> str: |
|
|
if not href: |
|
|
return base |
|
|
if href.startswith("//"): |
|
|
parsed = urlparse(base) |
|
|
return f"{parsed.scheme}:{href}" |
|
|
if href.startswith("/"): |
|
|
return urljoin(base, href) |
|
|
if href.startswith("http"): |
|
|
return href |
|
|
return urljoin(base, href) |
|
|
|
|
|
|
|
|
def _extract_text_from_pdf_url(pdf_url: str) -> str: |
|
|
""" |
|
|
Downloads a PDF from a URL and extracts its text content. |
|
|
Returns a summarized string of the content. |
|
|
|
|
|
ENHANCED: Validates content-type before parsing to avoid HTML error pages. |
|
|
""" |
|
|
if not PDF_AVAILABLE: |
|
|
return "[PDF Content: Install 'pypdf' to extract text]" |
|
|
|
|
|
try: |
|
|
|
|
|
headers = DEFAULT_HEADERS.copy() |
|
|
|
|
|
if "gazette.lk" in pdf_url: |
|
|
headers["Referer"] = "https://www.gazette.lk/" |
|
|
elif "meteo.gov.lk" in pdf_url: |
|
|
headers["Referer"] = "https://meteo.gov.lk/" |
|
|
else: |
|
|
headers["Referer"] = pdf_url.rsplit("/", 1)[0] |
|
|
|
|
|
response = requests.get( |
|
|
pdf_url, headers=headers, timeout=30, allow_redirects=True |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
content_type = response.headers.get("Content-Type", "").lower() |
|
|
content_bytes = response.content[:20] |
|
|
|
|
|
|
|
|
is_pdf_content_type = "application/pdf" in content_type |
|
|
is_pdf_header = content_bytes.startswith(b"%PDF") |
|
|
|
|
|
if not is_pdf_content_type and not is_pdf_header: |
|
|
|
|
|
if ( |
|
|
content_bytes.startswith(b"<!DOC") |
|
|
or content_bytes.startswith(b"<html") |
|
|
or b"<HTML" in content_bytes |
|
|
): |
|
|
logger.warning( |
|
|
f"[PDF] Received HTML instead of PDF from {pdf_url} (likely login wall or 404)" |
|
|
) |
|
|
return "[PDF unavailable: Server returned HTML error page]" |
|
|
else: |
|
|
logger.warning( |
|
|
f"[PDF] Unknown content type for {pdf_url}: {content_type}" |
|
|
) |
|
|
return f"[PDF unavailable: Unexpected content type '{content_type}']" |
|
|
|
|
|
|
|
|
with io.BytesIO(response.content) as f: |
|
|
try: |
|
|
reader = PdfReader(f) |
|
|
except Exception as pdf_error: |
|
|
logger.warning(f"[PDF] Failed to parse PDF from {pdf_url}: {pdf_error}") |
|
|
return "[PDF unavailable: Could not parse PDF structure]" |
|
|
|
|
|
text_content = [] |
|
|
|
|
|
|
|
|
for i, page in enumerate(reader.pages): |
|
|
try: |
|
|
text = page.extract_text() |
|
|
if text: |
|
|
text_content.append(text) |
|
|
except Exception as page_error: |
|
|
logger.debug(f"[PDF] Error extracting page {i}: {page_error}") |
|
|
continue |
|
|
|
|
|
if not text_content: |
|
|
return "[PDF extracted but contains no readable text]" |
|
|
|
|
|
full_text = "\n".join(text_content) |
|
|
|
|
|
|
|
|
full_text = re.sub(r"\n+", "\n", full_text).strip() |
|
|
return full_text |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
logger.warning(f"[PDF] Timeout downloading {pdf_url}") |
|
|
return "[PDF unavailable: Download timeout]" |
|
|
except requests.exceptions.HTTPError as e: |
|
|
logger.warning(f"[PDF] HTTP error for {pdf_url}: {e}") |
|
|
return f"[PDF unavailable: HTTP {e.response.status_code if e.response else 'error'}]" |
|
|
except Exception as e: |
|
|
logger.warning(f"[PDF] Failed to extract text from {pdf_url}: {e}") |
|
|
return f"[Error reading PDF: {str(e)}]" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_playwright(): |
|
|
if not PLAYWRIGHT_AVAILABLE: |
|
|
raise RuntimeError( |
|
|
"Playwright is not installed. Install with `pip install playwright` and run `playwright install`." |
|
|
) |
|
|
|
|
|
|
|
|
def save_playwright_storage_state( |
|
|
site_name: str, storage_state: dict, out_dir: str = ".sessions" |
|
|
) -> str: |
|
|
os.makedirs(out_dir, exist_ok=True) |
|
|
path = os.path.join(out_dir, f"{site_name}_storage_state.json") |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
json.dump(storage_state, f) |
|
|
return path |
|
|
|
|
|
|
|
|
def load_playwright_storage_state_path( |
|
|
site_name: str, out_dir: str = ".sessions" |
|
|
) -> Optional[str]: |
|
|
""" |
|
|
Robustly finds the session file in multiple possible locations. |
|
|
Priority order: |
|
|
1. src/utils/.sessions/ (where session_manager.py saves them) |
|
|
2. .sessions/ (current working directory) |
|
|
3. Root project .sessions/ |
|
|
""" |
|
|
filename = f"{site_name}_storage_state.json" |
|
|
|
|
|
|
|
|
src_utils_path = os.path.join(os.getcwd(), "src", "utils", out_dir, filename) |
|
|
if os.path.exists(src_utils_path): |
|
|
logger.info(f"[SESSION] ✅ Found session at {src_utils_path}") |
|
|
return src_utils_path |
|
|
|
|
|
|
|
|
cwd_path = os.path.join(os.getcwd(), out_dir, filename) |
|
|
if os.path.exists(cwd_path): |
|
|
logger.info(f"[SESSION] ✅ Found session at {cwd_path}") |
|
|
return cwd_path |
|
|
|
|
|
|
|
|
base_dir = os.path.dirname( |
|
|
os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
) |
|
|
root_path = os.path.join(base_dir, out_dir, filename) |
|
|
if os.path.exists(root_path): |
|
|
logger.info(f"[SESSION] ✅ Found session at {root_path}") |
|
|
return root_path |
|
|
|
|
|
|
|
|
direct_path = os.path.join(out_dir, filename) |
|
|
if os.path.exists(direct_path): |
|
|
logger.info(f"[SESSION] ✅ Found session at {direct_path}") |
|
|
return direct_path |
|
|
|
|
|
logger.warning(f"[SESSION] ❌ Could not find session file for {site_name}.") |
|
|
logger.warning("Checked locations:") |
|
|
logger.warning(f" 1. {src_utils_path}") |
|
|
logger.warning(f" 2. {cwd_path}") |
|
|
logger.warning(f" 3. {root_path}") |
|
|
logger.warning("\n💡 Run 'python src/utils/session_manager.py' to create sessions.") |
|
|
return None |
|
|
|
|
|
|
|
|
def create_or_restore_playwright_session( |
|
|
site_name: str, |
|
|
login_flow: Optional[dict] = None, |
|
|
headless: bool = True, |
|
|
storage_dir: str = ".sessions", |
|
|
wait_until: str = "networkidle", |
|
|
) -> str: |
|
|
ensure_playwright() |
|
|
existing_session = load_playwright_storage_state_path(site_name, storage_dir) |
|
|
if existing_session: |
|
|
return existing_session |
|
|
|
|
|
os.makedirs(storage_dir, exist_ok=True) |
|
|
session_path = os.path.join(storage_dir, f"{site_name}_storage_state.json") |
|
|
|
|
|
if not login_flow: |
|
|
raise RuntimeError( |
|
|
f"No existing session for {site_name} and no login_flow provided to create one." |
|
|
) |
|
|
|
|
|
logger.info(f"[PLAYWRIGHT] Creating new session for {site_name}...") |
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch(headless=headless) |
|
|
context = browser.new_context() |
|
|
page = context.new_page() |
|
|
try: |
|
|
page.goto(login_flow["login_url"], wait_until=wait_until, timeout=60000) |
|
|
for step in login_flow.get("steps", []): |
|
|
st = step.get("type") |
|
|
sel = step.get("selector") |
|
|
if st == "fill": |
|
|
value = step.get("value") or os.getenv(step.get("value_env"), "") |
|
|
page.fill(sel, value, timeout=15000) |
|
|
elif st == "click": |
|
|
page.click(sel, timeout=15000) |
|
|
elif st == "wait": |
|
|
page.wait_for_selector( |
|
|
step.get("selector"), timeout=step.get("timeout", 15000) |
|
|
) |
|
|
elif st == "goto": |
|
|
page.goto(step.get("url"), wait_until=wait_until, timeout=60000) |
|
|
|
|
|
storage = context.storage_state() |
|
|
with open(session_path, "w", encoding="utf-8") as f: |
|
|
json.dump(storage, f) |
|
|
logger.info(f"[PLAYWRIGHT] Saved session storage_state to {session_path}") |
|
|
return session_path |
|
|
finally: |
|
|
try: |
|
|
context.close() |
|
|
except: |
|
|
pass |
|
|
browser.close() |
|
|
|
|
|
|
|
|
def playwright_fetch_html_using_session( |
|
|
url: str, |
|
|
storage_state_path: Optional[str], |
|
|
headless: bool = True, |
|
|
wait_until: str = "networkidle", |
|
|
) -> str: |
|
|
ensure_playwright() |
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch(headless=headless) |
|
|
context_args = {} |
|
|
if storage_state_path and os.path.exists(storage_state_path): |
|
|
context_args["storage_state"] = storage_state_path |
|
|
|
|
|
context = browser.new_context(**context_args) |
|
|
page = context.new_page() |
|
|
try: |
|
|
page.goto(url, wait_until=wait_until, timeout=45000) |
|
|
time.sleep(1.0) |
|
|
html = page.content() |
|
|
return html |
|
|
except PlaywrightTimeoutError as e: |
|
|
logger.error(f"[PLAYWRIGHT] Timeout fetching {url}: {e}") |
|
|
return "" |
|
|
finally: |
|
|
try: |
|
|
context.close() |
|
|
except: |
|
|
pass |
|
|
browser.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_rivernet_cache: Dict[str, Any] = {} |
|
|
_rivernet_cache_time: Optional[datetime] = None |
|
|
RIVERNET_CACHE_DURATION_MINUTES = 30 |
|
|
|
|
|
|
|
|
RIVERNET_LOCATIONS = { |
|
|
|
|
|
"kelaniya": { |
|
|
"name": "Kelani River", |
|
|
"region": "Western", |
|
|
"url": "https://rivernet.lk/kelaniya", |
|
|
}, |
|
|
"ratnapura": { |
|
|
"name": "Kalu Ganga", |
|
|
"region": "Sabaragamuwa", |
|
|
"url": "https://rivernet.lk/ratnapura", |
|
|
}, |
|
|
"gampaha": { |
|
|
"name": "Maha Oya", |
|
|
"region": "Western", |
|
|
"url": "https://rivernet.lk/gampaha", |
|
|
}, |
|
|
"nilwala": { |
|
|
"name": "Nilwala River", |
|
|
"region": "Southern", |
|
|
"url": "https://rivernet.lk/nilwala", |
|
|
}, |
|
|
"galoya": { |
|
|
"name": "Gal Oya", |
|
|
"region": "Eastern", |
|
|
"url": "https://rivernet.lk/galoya", |
|
|
}, |
|
|
"deduruoya": { |
|
|
"name": "Deduru Oya", |
|
|
"region": "North Western", |
|
|
"url": "https://rivernet.lk/deduruoya", |
|
|
}, |
|
|
|
|
|
"maduru_oya": { |
|
|
"name": "Maduru Oya", |
|
|
"region": "Batticaloa", |
|
|
"url": "https://rivernet.lk/batticaloa?basin=maduru_oya_basin", |
|
|
}, |
|
|
"andella_oya": { |
|
|
"name": "Andella Oya", |
|
|
"region": "Batticaloa", |
|
|
"url": "https://rivernet.lk/batticaloa?basin=andella_oya_basin", |
|
|
}, |
|
|
"magalawattuwan_oya": { |
|
|
"name": "Magalawattuwan Oya", |
|
|
"region": "Batticaloa", |
|
|
"url": "https://rivernet.lk/batticaloa?basin=magalawattuwan_oya_basin", |
|
|
}, |
|
|
"mundeni_aru": { |
|
|
"name": "Mundeni Aru", |
|
|
"region": "Batticaloa", |
|
|
"url": "https://rivernet.lk/batticaloa?basin=mundeni_aru_basin", |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
def scrape_rivernet_impl( |
|
|
locations: Optional[List[str]] = None, |
|
|
use_cache: bool = True, |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Scrape river level data from rivernet.lk (Flood Early Warning System) |
|
|
|
|
|
IMPORTANT: rivernet.lk is a Flutter SPA, so we need Playwright for scraping. |
|
|
Data is cached for 15 minutes to reduce load on the service. |
|
|
|
|
|
Args: |
|
|
locations: List of location keys to scrape (e.g., ["kelaniya", "ratnapura"]) |
|
|
If None, scrapes all major locations |
|
|
use_cache: Whether to use cached data if available |
|
|
|
|
|
Returns: |
|
|
Dict with river levels, warnings, and status for each location |
|
|
""" |
|
|
global _rivernet_cache, _rivernet_cache_time |
|
|
|
|
|
|
|
|
if use_cache and _rivernet_cache_time: |
|
|
cache_age = (utc_now() - _rivernet_cache_time).total_seconds() / 60 |
|
|
if cache_age < RIVERNET_CACHE_DURATION_MINUTES: |
|
|
logger.info(f"[RIVERNET] Using cached data ({cache_age:.1f} min old)") |
|
|
return _rivernet_cache |
|
|
|
|
|
if not PLAYWRIGHT_AVAILABLE: |
|
|
logger.warning( |
|
|
"[RIVERNET] Playwright not available. Cannot scrape rivernet.lk (Flutter SPA)" |
|
|
) |
|
|
return { |
|
|
"error": "Playwright required for rivernet.lk (Flutter SPA)", |
|
|
"suggestion": "Install playwright: pip install playwright && playwright install chromium", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
logger.info("[RIVERNET] Starting river level data collection...") |
|
|
|
|
|
results = { |
|
|
"rivers": [], |
|
|
"alerts": [], |
|
|
"summary": {}, |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
"source": "rivernet.lk", |
|
|
} |
|
|
|
|
|
|
|
|
target_locations = locations or list(RIVERNET_LOCATIONS.keys()) |
|
|
|
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch(headless=True) |
|
|
context = browser.new_context( |
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
|
|
viewport={"width": 1280, "height": 720}, |
|
|
) |
|
|
page = context.new_page() |
|
|
page.set_default_timeout(300000) |
|
|
|
|
|
|
|
|
try: |
|
|
page.goto( |
|
|
"https://rivernet.lk/", wait_until="networkidle", timeout=300000 |
|
|
) |
|
|
|
|
|
time.sleep(5) |
|
|
|
|
|
|
|
|
main_html = page.content() |
|
|
main_soup = BeautifulSoup(main_html, "html.parser") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("[RIVERNET] Main page loaded successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[RIVERNET] Error loading main page: {e}") |
|
|
|
|
|
|
|
|
for loc_key in target_locations[:10]: |
|
|
if loc_key not in RIVERNET_LOCATIONS: |
|
|
continue |
|
|
|
|
|
loc_info = RIVERNET_LOCATIONS[loc_key] |
|
|
|
|
|
try: |
|
|
logger.info(f"[RIVERNET] Checking {loc_info['name']}...") |
|
|
page.goto( |
|
|
loc_info["url"], wait_until="networkidle", timeout=300000 |
|
|
) |
|
|
time.sleep(5) |
|
|
|
|
|
html = page.content() |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
page_text = soup.get_text(separator="\n", strip=True) |
|
|
|
|
|
|
|
|
river_data = { |
|
|
"location_key": loc_key, |
|
|
"name": loc_info["name"], |
|
|
"region": loc_info["region"], |
|
|
"url": loc_info["url"], |
|
|
"status": "unknown", |
|
|
"water_level": None, |
|
|
"warning_level": None, |
|
|
"last_updated": None, |
|
|
"raw_text": page_text[:500] if page_text else None, |
|
|
} |
|
|
|
|
|
|
|
|
level_patterns = [ |
|
|
|
|
|
r"(?:water\s*level|level)[:\s]*([0-9]+\.?[0-9]*)\s*(m|meter|ft)?", |
|
|
r"([0-9]+\.?[0-9]*)\s*(m|meter)\s*(?:above|below)?", |
|
|
r"current[:\s]*([0-9]+\.?[0-9]*)\s*(m)?", |
|
|
|
|
|
r"([0-9]+\.?[0-9]+)\s*(?:m|MSL)", |
|
|
|
|
|
r"(?:latest|current|now)[:\s]*([0-9]+\.?[0-9]*)", |
|
|
|
|
|
r"threshold[:\s]*([0-9]+\.?[0-9]*)", |
|
|
] |
|
|
|
|
|
for pattern in level_patterns: |
|
|
match = re.search(pattern, page_text, re.I) |
|
|
if match: |
|
|
try: |
|
|
value = float(match.group(1)) |
|
|
if ( |
|
|
0 < value < 50 |
|
|
): |
|
|
river_data["water_level"] = { |
|
|
"value": round(value, 2), |
|
|
"unit": ( |
|
|
match.group(2) |
|
|
if len(match.groups()) > 1 |
|
|
and match.group(2) |
|
|
else "m" |
|
|
), |
|
|
} |
|
|
logger.info(f" Water level: {value}m") |
|
|
break |
|
|
except (ValueError, IndexError): |
|
|
continue |
|
|
|
|
|
|
|
|
text_lower = page_text.lower() |
|
|
|
|
|
|
|
|
river_data["status"] = "normal" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
danger_phrases = [ |
|
|
"major flood", |
|
|
"danger level exceeded", |
|
|
"critical flood", |
|
|
"red alert", |
|
|
"evacuate immediately", |
|
|
"extreme flood", |
|
|
"water level exceeds danger", |
|
|
"above danger level", |
|
|
] |
|
|
if any(phrase in text_lower for phrase in danger_phrases): |
|
|
river_data["status"] = "danger" |
|
|
|
|
|
|
|
|
elif any( |
|
|
phrase in text_lower |
|
|
for phrase in [ |
|
|
"minor flood", |
|
|
"warning level exceeded", |
|
|
"flood alert issued", |
|
|
"amber alert", |
|
|
"approaching warning level", |
|
|
"water level exceeds warning", |
|
|
"above warning level", |
|
|
] |
|
|
): |
|
|
river_data["status"] = "warning" |
|
|
|
|
|
|
|
|
elif any( |
|
|
phrase in text_lower |
|
|
for phrase in [ |
|
|
"water level rising", |
|
|
"rising trend detected", |
|
|
"level is rising rapidly", |
|
|
"increasing water level", |
|
|
] |
|
|
): |
|
|
river_data["status"] = "rising" |
|
|
|
|
|
|
|
|
elif any( |
|
|
phrase in text_lower |
|
|
for phrase in [ |
|
|
"normal level", |
|
|
"stable", |
|
|
"safe level", |
|
|
"decreasing", |
|
|
"below warning", |
|
|
] |
|
|
): |
|
|
river_data["status"] = "normal" |
|
|
|
|
|
results["rivers"].append(river_data) |
|
|
logger.info(f" ✓ {loc_info['name']}: {river_data['status']}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[RIVERNET] Error scraping {loc_info['name']}: {e}") |
|
|
results["rivers"].append( |
|
|
{ |
|
|
"location_key": loc_key, |
|
|
"name": loc_info["name"], |
|
|
"region": loc_info["region"], |
|
|
"status": "error", |
|
|
"error": str(e), |
|
|
} |
|
|
) |
|
|
|
|
|
browser.close() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[RIVERNET] Critical error: {e}") |
|
|
results["error"] = str(e) |
|
|
|
|
|
|
|
|
status_counts = { |
|
|
"danger": 0, |
|
|
"warning": 0, |
|
|
"rising": 0, |
|
|
"normal": 0, |
|
|
"unknown": 0, |
|
|
"error": 0, |
|
|
} |
|
|
for river in results["rivers"]: |
|
|
status = river.get("status", "unknown") |
|
|
status_counts[status] = status_counts.get(status, 0) + 1 |
|
|
|
|
|
results["summary"] = { |
|
|
"total_monitored": len(results["rivers"]), |
|
|
"status_breakdown": status_counts, |
|
|
"has_alerts": status_counts["danger"] > 0 or status_counts["warning"] > 0, |
|
|
"overall_status": ( |
|
|
"danger" |
|
|
if status_counts["danger"] > 0 |
|
|
else ( |
|
|
"warning" |
|
|
if status_counts["warning"] > 0 |
|
|
else ("rising" if status_counts["rising"] > 0 else "normal") |
|
|
) |
|
|
), |
|
|
} |
|
|
|
|
|
|
|
|
_rivernet_cache = results |
|
|
_rivernet_cache_time = utc_now() |
|
|
|
|
|
logger.info( |
|
|
f"[RIVERNET] Completed: {len(results['rivers'])} rivers, {len(results['alerts'])} alerts" |
|
|
) |
|
|
return results |
|
|
|
|
|
|
|
|
def tool_rivernet_status() -> Dict[str, Any]: |
|
|
""" |
|
|
Get current river levels and flood warnings from rivernet.lk |
|
|
|
|
|
Returns real-time river level data for major rivers in Sri Lanka including: |
|
|
- Kelani River (Western Province) |
|
|
- Kalu Ganga (Sabaragamuwa) |
|
|
- Nilwala (Southern) |
|
|
- Maha Oya (Western) |
|
|
- Gal Oya (Eastern) |
|
|
- Deduru Oya (North Western) |
|
|
|
|
|
Data is cached for 15 minutes to reduce load. |
|
|
""" |
|
|
return scrape_rivernet_impl(use_cache=True) |
|
|
|
|
|
|
|
|
def tool_district_weather(district: str = "colombo") -> Dict[str, Any]: |
|
|
""" |
|
|
Get weather forecast for a specific district of Sri Lanka. |
|
|
|
|
|
Args: |
|
|
district: District name (e.g., 'colombo', 'kandy', 'galle') |
|
|
|
|
|
Returns: |
|
|
District-specific weather forecast with temperature and conditions |
|
|
""" |
|
|
district_lower = district.lower().strip() |
|
|
|
|
|
|
|
|
weather_data = tool_weather_nowcast(location=district) |
|
|
|
|
|
if "error" in weather_data: |
|
|
return weather_data |
|
|
|
|
|
|
|
|
forecast_text = weather_data.get("forecast", "") |
|
|
|
|
|
|
|
|
district_info = { |
|
|
"district": district.title(), |
|
|
"forecast": forecast_text, |
|
|
"source": weather_data.get("source"), |
|
|
"fetched_at": weather_data.get("fetched_at"), |
|
|
} |
|
|
|
|
|
|
|
|
district_pattern = rf"(?:{district}|{district.title()})[:\s]*([^\n]+)" |
|
|
match = re.search(district_pattern, forecast_text, re.I) |
|
|
if match: |
|
|
district_info["specific_forecast"] = match.group(0) |
|
|
|
|
|
return district_info |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_floodwatch_historical_cache: Optional[Dict[str, Any]] = None |
|
|
_floodwatch_cache_time: Optional[datetime] = None |
|
|
FLOODWATCH_CACHE_DURATION_HOURS = 24 |
|
|
|
|
|
|
|
|
def tool_floodwatch_historical() -> Dict[str, Any]: |
|
|
""" |
|
|
Get 30-year historical flood pattern analysis data. |
|
|
|
|
|
Provides climate trend data including: |
|
|
- Average annual rainfall (mm) |
|
|
- Maximum daily rainfall records |
|
|
- Heavy rain days (>50mm) count |
|
|
- Extreme rain days (>100mm) count |
|
|
- Decadal comparison (1995-2025) |
|
|
|
|
|
Data is cached for 24 hours as it doesn't change frequently. |
|
|
|
|
|
Returns: |
|
|
Dict with historical flood pattern analysis |
|
|
""" |
|
|
global _floodwatch_historical_cache, _floodwatch_cache_time |
|
|
|
|
|
|
|
|
if _floodwatch_historical_cache and _floodwatch_cache_time: |
|
|
cache_age = (utc_now() - _floodwatch_cache_time).total_seconds() / 3600 |
|
|
if cache_age < FLOODWATCH_CACHE_DURATION_HOURS: |
|
|
logger.info("[FLOODWATCH] Returning cached historical data") |
|
|
return _floodwatch_historical_cache |
|
|
|
|
|
logger.info("[FLOODWATCH] Fetching historical climate data") |
|
|
|
|
|
|
|
|
|
|
|
historical_data = { |
|
|
"source": "FloodWatch Sri Lanka / Meteorological Department", |
|
|
"period": "1995-2025 (30 Years)", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
|
|
|
"statistics": { |
|
|
"avg_annual_rainfall_mm": 2930, |
|
|
"max_daily_rainfall_mm": 218, |
|
|
"heavy_rain_days_50mm": 98, |
|
|
"extreme_rain_days_100mm": 15, |
|
|
"avg_flood_events_per_year": 4.2, |
|
|
}, |
|
|
|
|
|
"decadal_analysis": [ |
|
|
{ |
|
|
"period": "1995-2004", |
|
|
"avg_rainfall_mm": 2650, |
|
|
"extreme_days": 11, |
|
|
"max_daily_mm": 175, |
|
|
"major_flood_events": 8, |
|
|
}, |
|
|
{ |
|
|
"period": "2005-2014", |
|
|
"avg_rainfall_mm": 2850, |
|
|
"extreme_days": 14, |
|
|
"max_daily_mm": 198, |
|
|
"major_flood_events": 12, |
|
|
}, |
|
|
{ |
|
|
"period": "2015-2025", |
|
|
"avg_rainfall_mm": 3290, |
|
|
"extreme_days": 18, |
|
|
"max_daily_mm": 218, |
|
|
"major_flood_events": 17, |
|
|
}, |
|
|
], |
|
|
|
|
|
"key_findings": [ |
|
|
"Maximum daily rainfall intensity has increased by 43%", |
|
|
"Extreme rain days (>100mm) have increased by 64% since 1995", |
|
|
"Major flood events have doubled in the last decade", |
|
|
"Southwest monsoon intensity shows increasing trend", |
|
|
"Inter-monsoonal rainfall becoming more erratic", |
|
|
], |
|
|
|
|
|
"high_risk_periods": [ |
|
|
{"months": "May-June", "type": "Southwest Monsoon Onset", "risk": "high"}, |
|
|
{"months": "October-November", "type": "Northeast Monsoon", "risk": "high"}, |
|
|
{"months": "April-May", "type": "Inter-monsoon (First)", "risk": "medium"}, |
|
|
], |
|
|
} |
|
|
|
|
|
|
|
|
_floodwatch_historical_cache = historical_data |
|
|
_floodwatch_cache_time = utc_now() |
|
|
|
|
|
return historical_data |
|
|
|
|
|
|
|
|
def tool_calculate_national_threat( |
|
|
river_data: Optional[Dict[str, Any]] = None, dmc_alerts: Optional[List[str]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate national flood threat score (0-100). |
|
|
|
|
|
Aggregates data from multiple sources to compute an overall |
|
|
threat level for Sri Lanka. |
|
|
|
|
|
Args: |
|
|
river_data: RiverNet data with river statuses |
|
|
dmc_alerts: List of active DMC alerts |
|
|
|
|
|
Returns: |
|
|
Dict with threat score, breakdown, and risk districts |
|
|
""" |
|
|
logger.info("[THREAT] Calculating national threat score") |
|
|
|
|
|
score = 0 |
|
|
breakdown = { |
|
|
"river_contribution": 0, |
|
|
"alert_contribution": 0, |
|
|
"seasonal_contribution": 0, |
|
|
} |
|
|
critical_districts = [] |
|
|
high_risk_districts = [] |
|
|
medium_risk_districts = [] |
|
|
|
|
|
|
|
|
if river_data and river_data.get("rivers"): |
|
|
for river in river_data.get("rivers", []): |
|
|
status = river.get("status", "unknown").lower() |
|
|
region = river.get("region", "") |
|
|
|
|
|
if status == "danger": |
|
|
breakdown["river_contribution"] += 15 |
|
|
if region and region not in critical_districts: |
|
|
critical_districts.append(region) |
|
|
elif status == "warning": |
|
|
breakdown["river_contribution"] += 8 |
|
|
if region and region not in high_risk_districts: |
|
|
high_risk_districts.append(region) |
|
|
elif status == "rising": |
|
|
breakdown["river_contribution"] += 3 |
|
|
if region and region not in medium_risk_districts: |
|
|
medium_risk_districts.append(region) |
|
|
|
|
|
breakdown["river_contribution"] = min(50, breakdown["river_contribution"]) |
|
|
|
|
|
|
|
|
if dmc_alerts: |
|
|
for alert in dmc_alerts: |
|
|
alert_lower = alert.lower() if isinstance(alert, str) else "" |
|
|
if any(kw in alert_lower for kw in ["red", "danger", "severe", "extreme"]): |
|
|
breakdown["alert_contribution"] += 10 |
|
|
elif any(kw in alert_lower for kw in ["warning", "heavy"]): |
|
|
breakdown["alert_contribution"] += 5 |
|
|
elif any(kw in alert_lower for kw in ["advisory", "caution"]): |
|
|
breakdown["alert_contribution"] += 2 |
|
|
|
|
|
breakdown["alert_contribution"] = min(30, breakdown["alert_contribution"]) |
|
|
|
|
|
|
|
|
current_month = utc_now().month |
|
|
monsoon_months = {5: 15, 6: 18, 10: 15, 11: 18} |
|
|
inter_monsoon = {4: 8, 9: 8} |
|
|
|
|
|
if current_month in monsoon_months: |
|
|
breakdown["seasonal_contribution"] = monsoon_months[current_month] |
|
|
elif current_month in inter_monsoon: |
|
|
breakdown["seasonal_contribution"] = inter_monsoon[current_month] |
|
|
else: |
|
|
breakdown["seasonal_contribution"] = 3 |
|
|
|
|
|
|
|
|
score = sum(breakdown.values()) |
|
|
score = min(100, max(0, score)) |
|
|
|
|
|
|
|
|
if score >= 70: |
|
|
threat_level = "CRITICAL" |
|
|
color = "red" |
|
|
elif score >= 50: |
|
|
threat_level = "HIGH" |
|
|
color = "orange" |
|
|
elif score >= 30: |
|
|
threat_level = "MODERATE" |
|
|
color = "yellow" |
|
|
else: |
|
|
threat_level = "LOW" |
|
|
color = "green" |
|
|
|
|
|
return { |
|
|
"national_threat_score": score, |
|
|
"threat_level": threat_level, |
|
|
"color": color, |
|
|
"breakdown": breakdown, |
|
|
"risk_summary": { |
|
|
"critical_count": len(critical_districts), |
|
|
"high_count": len(high_risk_districts), |
|
|
"medium_count": len(medium_risk_districts), |
|
|
"critical_districts": critical_districts, |
|
|
"high_risk_districts": high_risk_districts, |
|
|
"medium_risk_districts": medium_risk_districts, |
|
|
}, |
|
|
"calculated_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_ceb_cache: Dict[str, Any] = {} |
|
|
_ceb_cache_time: Optional[datetime] = None |
|
|
_fuel_cache: Dict[str, Any] = {} |
|
|
_fuel_cache_time: Optional[datetime] = None |
|
|
_cbsl_cache: Dict[str, Any] = {} |
|
|
_cbsl_cache_time: Optional[datetime] = None |
|
|
_health_cache: Dict[str, Any] = {} |
|
|
_health_cache_time: Optional[datetime] = None |
|
|
_commodity_cache: Dict[str, Any] = {} |
|
|
_commodity_cache_time: Optional[datetime] = None |
|
|
_water_cache: Dict[str, Any] = {} |
|
|
_water_cache_time: Optional[datetime] = None |
|
|
|
|
|
SA_CACHE_DURATION_MINUTES = 15 |
|
|
|
|
|
|
|
|
def tool_ceb_power_status() -> Dict[str, Any]: |
|
|
""" |
|
|
Get CEB power outage / load shedding schedule for Sri Lanka. |
|
|
|
|
|
ENHANCED: |
|
|
- Scrapes ceb.lk for official schedules and PDF press releases |
|
|
- Extracts text from Dropbox-hosted PDF announcements |
|
|
- Falls back to news sites for power-related updates |
|
|
|
|
|
Returns: |
|
|
Dict with schedules by area, current status, and timestamp |
|
|
""" |
|
|
global _ceb_cache, _ceb_cache_time |
|
|
|
|
|
|
|
|
if _ceb_cache_time: |
|
|
cache_age = (utc_now() - _ceb_cache_time).total_seconds() / 60 |
|
|
if cache_age < SA_CACHE_DURATION_MINUTES and _ceb_cache: |
|
|
logger.info(f"[CEB] Using cached data ({cache_age:.1f} min old)") |
|
|
return _ceb_cache |
|
|
|
|
|
logger.info("[CEB] Fetching power outage status...") |
|
|
|
|
|
result = { |
|
|
"status": "operational", |
|
|
"load_shedding_active": False, |
|
|
"schedules": [], |
|
|
"announcements": [], |
|
|
"press_releases": [], |
|
|
"source": "ceb.lk", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
"scrape_status": "baseline", |
|
|
} |
|
|
|
|
|
pdf_links_found = [] |
|
|
|
|
|
try: |
|
|
|
|
|
resp = _safe_get("https://ceb.lk/", timeout=30) |
|
|
if resp: |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
page_text = soup.get_text(separator="\n", strip=True).lower() |
|
|
|
|
|
|
|
|
if any( |
|
|
kw in page_text |
|
|
for kw in ["load shedding", "power cut", "outage schedule"] |
|
|
): |
|
|
result["load_shedding_active"] = True |
|
|
result["status"] = "load_shedding" |
|
|
|
|
|
|
|
|
for tag in soup.find_all( |
|
|
["marquee", "div", "p"], |
|
|
class_=lambda x: x and "announce" in str(x).lower(), |
|
|
): |
|
|
text = tag.get_text(strip=True) |
|
|
if text and len(text) > 20: |
|
|
result["announcements"].append(text[:200]) |
|
|
|
|
|
|
|
|
for link in soup.find_all("a", href=True): |
|
|
href = link.get("href", "") |
|
|
link_text = link.get_text(strip=True).lower() |
|
|
|
|
|
|
|
|
is_dropbox = "dropbox.com" in href |
|
|
is_pdf = href.lower().endswith(".pdf") |
|
|
is_press_release = any( |
|
|
kw in link_text |
|
|
for kw in ["press release", "announcement", "notice", "schedule"] |
|
|
) |
|
|
|
|
|
if is_dropbox or is_pdf or is_press_release: |
|
|
|
|
|
if is_dropbox: |
|
|
|
|
|
if "dl=0" in href: |
|
|
href = href.replace("dl=0", "dl=1") |
|
|
elif "?dl=" not in href and "&dl=" not in href: |
|
|
href = href + ("&" if "?" in href else "?") + "dl=1" |
|
|
|
|
|
pdf_links_found.append( |
|
|
{ |
|
|
"url": href, |
|
|
"title": link_text or "Press Release", |
|
|
"is_dropbox": is_dropbox, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
pdf_links_found = pdf_links_found[:3] |
|
|
|
|
|
|
|
|
for pdf_info in pdf_links_found: |
|
|
try: |
|
|
logger.info(f"[CEB] Extracting PDF: {pdf_info['title'][:50]}...") |
|
|
pdf_text = _extract_text_from_pdf_url(pdf_info["url"]) |
|
|
|
|
|
if pdf_text and not pdf_text.startswith( |
|
|
"[" |
|
|
): |
|
|
|
|
|
pdf_lower = pdf_text.lower() |
|
|
if any( |
|
|
kw in pdf_lower |
|
|
for kw in [ |
|
|
"load shedding", |
|
|
"power cut", |
|
|
"outage", |
|
|
"interruption", |
|
|
] |
|
|
): |
|
|
result["load_shedding_active"] = True |
|
|
result["status"] = "load_shedding" |
|
|
|
|
|
result["press_releases"].append( |
|
|
{ |
|
|
"title": pdf_info["title"], |
|
|
"content": pdf_text[:1000] |
|
|
+ ("..." if len(pdf_text) > 1000 else ""), |
|
|
"source": ( |
|
|
"dropbox" if pdf_info["is_dropbox"] else "ceb.lk" |
|
|
), |
|
|
} |
|
|
) |
|
|
result["scrape_status"] = "live" |
|
|
except Exception as pdf_error: |
|
|
logger.warning(f"[CEB] PDF extraction error: {pdf_error}") |
|
|
|
|
|
logger.info( |
|
|
f"[CEB] Scraped - PDFs found: {len(pdf_links_found)}, Active: {result['load_shedding_active']}" |
|
|
) |
|
|
|
|
|
|
|
|
news_sources = [ |
|
|
"https://www.news.lk/", |
|
|
"https://www.dailymirror.lk/", |
|
|
] |
|
|
|
|
|
for news_url in news_sources: |
|
|
try: |
|
|
news_resp = _safe_get(news_url, timeout=20) |
|
|
if news_resp: |
|
|
news_soup = BeautifulSoup(news_resp.text, "html.parser") |
|
|
news_text = news_soup.get_text(separator=" ", strip=True).lower() |
|
|
|
|
|
|
|
|
if any( |
|
|
kw in news_text |
|
|
for kw in ["power cut", "load shedding", "ceb", "electricity"] |
|
|
): |
|
|
|
|
|
for headline in news_soup.find_all(["h1", "h2", "h3", "h4"]): |
|
|
h_text = headline.get_text(strip=True) |
|
|
if any( |
|
|
kw in h_text.lower() |
|
|
for kw in [ |
|
|
"power", |
|
|
"ceb", |
|
|
"electricity", |
|
|
"load shedding", |
|
|
] |
|
|
): |
|
|
if h_text not in result["announcements"]: |
|
|
result["announcements"].append( |
|
|
f"[News] {h_text[:150]}" |
|
|
) |
|
|
break |
|
|
except Exception as news_error: |
|
|
logger.debug(f"[CEB] News scraping error for {news_url}: {news_error}") |
|
|
|
|
|
|
|
|
if not result["press_releases"] and not result["announcements"]: |
|
|
result["status"] = "no_load_shedding" |
|
|
result["announcements"].append("CEB: Normal power supply across the island") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[CEB] Scraping error: {e}") |
|
|
result["status"] = "unknown" |
|
|
result["error"] = str(e) |
|
|
|
|
|
|
|
|
_ceb_cache = result |
|
|
_ceb_cache_time = utc_now() |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def tool_fuel_prices() -> Dict[str, Any]: |
|
|
""" |
|
|
Get current fuel prices in Sri Lanka. |
|
|
|
|
|
Scrapes official CEYPETCO/LIOC announcements or news sources. |
|
|
|
|
|
Returns: |
|
|
Dict with prices for petrol, diesel, kerosene, and last update |
|
|
""" |
|
|
global _fuel_cache, _fuel_cache_time |
|
|
|
|
|
|
|
|
if _fuel_cache_time: |
|
|
cache_age = (utc_now() - _fuel_cache_time).total_seconds() / 60 |
|
|
if cache_age < SA_CACHE_DURATION_MINUTES and _fuel_cache: |
|
|
logger.info(f"[FUEL] Using cached data ({cache_age:.1f} min old)") |
|
|
return _fuel_cache |
|
|
|
|
|
logger.info("[FUEL] Fetching fuel prices...") |
|
|
|
|
|
|
|
|
|
|
|
result = { |
|
|
"prices": { |
|
|
"petrol_92": {"price": 294.00, "unit": "LKR/L", "name": "Petrol 92 Octane"}, |
|
|
"petrol_95": {"price": 335.00, "unit": "LKR/L", "name": "Petrol 95 Octane"}, |
|
|
"auto_diesel": {"price": 277.00, "unit": "LKR/L", "name": "Auto Diesel"}, |
|
|
"super_diesel": {"price": 318.00, "unit": "LKR/L", "name": "Super Diesel"}, |
|
|
"kerosene": {"price": 185.00, "unit": "LKR/L", "name": "Kerosene"}, |
|
|
}, |
|
|
"last_revision": "2025-12-01", |
|
|
"source": "CEYPETCO", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
"note": "Prices confirmed unchanged for December 2025", |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
news_sources = [ |
|
|
"https://www.news.lk/", |
|
|
"https://www.dailymirror.lk/", |
|
|
"https://www.newsfirst.lk/", |
|
|
] |
|
|
|
|
|
for source_url in news_sources: |
|
|
resp = _safe_get(source_url, timeout=20) |
|
|
if resp: |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
page_text = soup.get_text(separator=" ", strip=True).lower() |
|
|
|
|
|
|
|
|
if "fuel" in page_text and ("price" in page_text or "lkr" in page_text): |
|
|
|
|
|
petrol_match = re.search( |
|
|
r"petrol\s*(?:92|95)?\s*(?:octane)?\s*[:\-]?\s*(?:rs\.?|lkr)?\s*(\d{2,3}(?:\.\d{2})?)", |
|
|
page_text, |
|
|
) |
|
|
diesel_match = re.search( |
|
|
r"diesel\s*[:\-]?\s*(?:rs\.?|lkr)?\s*(\d{2,3}(?:\.\d{2})?)", |
|
|
page_text, |
|
|
) |
|
|
|
|
|
if petrol_match: |
|
|
try: |
|
|
result["prices"]["petrol_92"]["price"] = float( |
|
|
petrol_match.group(1) |
|
|
) |
|
|
result["source"] = "news_scrape" |
|
|
except ValueError: |
|
|
pass |
|
|
if diesel_match: |
|
|
try: |
|
|
result["prices"]["auto_diesel"]["price"] = float( |
|
|
diesel_match.group(1) |
|
|
) |
|
|
except ValueError: |
|
|
pass |
|
|
break |
|
|
|
|
|
logger.info( |
|
|
f"[FUEL] Fetched prices - Petrol 92: {result['prices']['petrol_92']['price']}" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[FUEL] Scraping error: {e}") |
|
|
result["error"] = str(e) |
|
|
|
|
|
|
|
|
_fuel_cache = result |
|
|
_fuel_cache_time = utc_now() |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def tool_cbsl_indicators() -> Dict[str, Any]: |
|
|
""" |
|
|
Get key economic indicators from Central Bank of Sri Lanka. |
|
|
|
|
|
Scrapes live data from cbsl.gov.lk including: |
|
|
- Exchange rates (USD/LKR TT Buy/Sell) |
|
|
- CCPI Inflation |
|
|
- Overnight Policy Rate |
|
|
- Forex reserves |
|
|
|
|
|
Returns: |
|
|
Dict with economic indicators and trend data |
|
|
""" |
|
|
global _cbsl_cache, _cbsl_cache_time |
|
|
|
|
|
|
|
|
if _cbsl_cache_time: |
|
|
cache_age = (utc_now() - _cbsl_cache_time).total_seconds() / 60 |
|
|
if cache_age < SA_CACHE_DURATION_MINUTES and _cbsl_cache: |
|
|
logger.info(f"[CBSL] Using cached data ({cache_age:.1f} min old)") |
|
|
return _cbsl_cache |
|
|
|
|
|
logger.info("[CBSL] Fetching economic indicators from cbsl.gov.lk...") |
|
|
|
|
|
|
|
|
result = { |
|
|
"indicators": { |
|
|
"inflation": { |
|
|
"ccpi_yoy": 2.10, |
|
|
"ncpi_yoy": 2.5, |
|
|
"trend": "stable", |
|
|
"unit": "%", |
|
|
}, |
|
|
"policy_rates": { |
|
|
"sdfr": 7.25, |
|
|
"slfr": 8.25, |
|
|
"overnight_rate": 7.75, |
|
|
"last_change": "2024-12-01", |
|
|
"change_direction": "decreased", |
|
|
}, |
|
|
"exchange_rate": { |
|
|
"usd_lkr_buy": 305.32, |
|
|
"usd_lkr_sell": 312.91, |
|
|
"usd_lkr": 309.12, |
|
|
"eur_lkr": 325.50, |
|
|
"gbp_lkr": 390.25, |
|
|
"trend": "stable", |
|
|
}, |
|
|
"forex_reserves": { |
|
|
"value": 6.5, |
|
|
"unit": "Billion USD", |
|
|
"months_of_imports": 4.0, |
|
|
"trend": "improving", |
|
|
}, |
|
|
}, |
|
|
"source": "cbsl.gov.lk", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
"data_as_of": "2025-12", |
|
|
"scrape_status": "baseline", |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
resp = _safe_get("https://www.cbsl.gov.lk/", timeout=30) |
|
|
if resp: |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
page_text = soup.get_text(separator=" ", strip=True) |
|
|
|
|
|
scraped_any = False |
|
|
|
|
|
|
|
|
tt_buy_match = re.search( |
|
|
r"TT\s*Buy[:\s]*(\d{2,3}(?:\.\d{2,4})?)", page_text, re.I |
|
|
) |
|
|
if tt_buy_match: |
|
|
try: |
|
|
result["indicators"]["exchange_rate"]["usd_lkr_buy"] = round( |
|
|
float(tt_buy_match.group(1)), 2 |
|
|
) |
|
|
scraped_any = True |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
|
|
|
tt_sell_match = re.search( |
|
|
r"TT\s*Sell[:\s]*(\d{2,3}(?:\.\d{2,4})?)", page_text, re.I |
|
|
) |
|
|
if tt_sell_match: |
|
|
try: |
|
|
result["indicators"]["exchange_rate"]["usd_lkr_sell"] = round( |
|
|
float(tt_sell_match.group(1)), 2 |
|
|
) |
|
|
scraped_any = True |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
|
|
|
if tt_buy_match and tt_sell_match: |
|
|
buy = result["indicators"]["exchange_rate"]["usd_lkr_buy"] |
|
|
sell = result["indicators"]["exchange_rate"]["usd_lkr_sell"] |
|
|
result["indicators"]["exchange_rate"]["usd_lkr"] = round( |
|
|
(buy + sell) / 2, 2 |
|
|
) |
|
|
|
|
|
|
|
|
inflation_patterns = [ |
|
|
r"CCPI\s*Inflation[:\s]*(\d{1,2}(?:\.\d{1,2})?)\s*%", |
|
|
r"Inflation[:\s]*(\d{1,2}(?:\.\d{1,2})?)\s*%", |
|
|
r"(\d{1,2}(?:\.\d{1,2})?)\s*%\s*(?:CCPI|Inflation)", |
|
|
] |
|
|
for pattern in inflation_patterns: |
|
|
inflation_match = re.search(pattern, page_text, re.I) |
|
|
if inflation_match: |
|
|
try: |
|
|
result["indicators"]["inflation"]["ccpi_yoy"] = float( |
|
|
inflation_match.group(1) |
|
|
) |
|
|
scraped_any = True |
|
|
break |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
|
|
|
policy_patterns = [ |
|
|
r"Overnight\s*Policy\s*Rate[:\s]*(\d{1,2}(?:\.\d{1,2})?)\s*%", |
|
|
r"Policy\s*Rate[:\s]*(\d{1,2}(?:\.\d{1,2})?)\s*%", |
|
|
r"(\d{1,2}(?:\.\d{1,2})?)\s*%\s*(?:Policy\s*Rate)", |
|
|
] |
|
|
for pattern in policy_patterns: |
|
|
policy_match = re.search(pattern, page_text, re.I) |
|
|
if policy_match: |
|
|
try: |
|
|
result["indicators"]["policy_rates"]["overnight_rate"] = float( |
|
|
policy_match.group(1) |
|
|
) |
|
|
scraped_any = True |
|
|
break |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
if scraped_any: |
|
|
result["scrape_status"] = "live" |
|
|
result["data_as_of"] = utc_now().strftime("%Y-%m") |
|
|
logger.info( |
|
|
f"[CBSL] ✓ Scraped live data - " |
|
|
f"USD/LKR Buy: {result['indicators']['exchange_rate']['usd_lkr_buy']}, " |
|
|
f"Sell: {result['indicators']['exchange_rate']['usd_lkr_sell']}, " |
|
|
f"Inflation: {result['indicators']['inflation']['ccpi_yoy']}%" |
|
|
) |
|
|
else: |
|
|
logger.info("[CBSL] Using baseline data - no live values matched") |
|
|
else: |
|
|
logger.warning("[CBSL] Could not reach cbsl.gov.lk, using baseline data") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[CBSL] Scraping error: {e}") |
|
|
result["error"] = str(e) |
|
|
|
|
|
|
|
|
_cbsl_cache = result |
|
|
_cbsl_cache_time = utc_now() |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def tool_health_alerts() -> Dict[str, Any]: |
|
|
""" |
|
|
Get health alerts and disease outbreak information for Sri Lanka. |
|
|
|
|
|
Includes dengue case counts, epidemic alerts, and health advisories. |
|
|
Filters out navigation text (circulars, menus) for cleaner alerts. |
|
|
|
|
|
Returns: |
|
|
Dict with health alerts, disease data, and notifications |
|
|
""" |
|
|
global _health_cache, _health_cache_time |
|
|
|
|
|
|
|
|
if _health_cache_time: |
|
|
cache_age = (utc_now() - _health_cache_time).total_seconds() / 60 |
|
|
if cache_age < SA_CACHE_DURATION_MINUTES and _health_cache: |
|
|
logger.info(f"[HEALTH] Using cached data ({cache_age:.1f} min old)") |
|
|
return _health_cache |
|
|
|
|
|
logger.info("[HEALTH] Fetching health alerts...") |
|
|
|
|
|
|
|
|
result = { |
|
|
"alerts": [], |
|
|
"dengue": { |
|
|
"weekly_cases": 850, |
|
|
"trend": "stable", |
|
|
"high_risk_districts": ["Colombo", "Gampaha", "Kalutara"], |
|
|
"outbreak_status": "endemic", |
|
|
}, |
|
|
"other_diseases": [], |
|
|
"advisories": [], |
|
|
"source": "health.gov.lk", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
resp = _safe_get("https://www.health.gov.lk/", timeout=30) |
|
|
if resp: |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
|
|
|
|
|
|
for trash in soup.find_all( |
|
|
["nav", "header", "footer", "script", "style", "noscript", "iframe"] |
|
|
): |
|
|
trash.decompose() |
|
|
|
|
|
|
|
|
for menu in soup.select(".menu, .navigation, #main-menu, .top-bar"): |
|
|
menu.decompose() |
|
|
|
|
|
|
|
|
explicit_alerts = [] |
|
|
|
|
|
|
|
|
for marquee in soup.find_all("marquee"): |
|
|
text = marquee.get_text(strip=True) |
|
|
if text and len(text) > 20 and "welcome" not in text.lower(): |
|
|
explicit_alerts.append(text) |
|
|
|
|
|
|
|
|
for alert_div in soup.select(".alert, .notice, .warning, .news-ticker"): |
|
|
text = alert_div.get_text(strip=True) |
|
|
if text and len(text) > 20: |
|
|
explicit_alerts.append(text) |
|
|
|
|
|
|
|
|
for alert_text in explicit_alerts[:3]: |
|
|
|
|
|
if "circular" not in alert_text.lower(): |
|
|
result["alerts"].append( |
|
|
{ |
|
|
"type": "health_notice", |
|
|
"text": alert_text[:200], |
|
|
"severity": "medium", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if not result["alerts"]: |
|
|
|
|
|
main_content = ( |
|
|
soup.select_one("main, #content, .container, body") or soup.body |
|
|
) |
|
|
page_text = main_content.get_text(separator=" ", strip=True).lower() |
|
|
|
|
|
|
|
|
outbreak_keywords = [ |
|
|
"dengue outbreak", |
|
|
"epidemic alert", |
|
|
"health emergency", |
|
|
"spread of disease", |
|
|
"influenza warning", |
|
|
] |
|
|
|
|
|
for kw in outbreak_keywords: |
|
|
if kw in page_text: |
|
|
idx = page_text.find(kw) |
|
|
|
|
|
context = page_text[max(0, idx - 20) : idx + 150] |
|
|
|
|
|
context = " ".join(context.split()) |
|
|
|
|
|
if len(context) > 20 and "circular" not in context: |
|
|
result["alerts"].append( |
|
|
{ |
|
|
"type": "health_notice", |
|
|
"text": f"...{context}...", |
|
|
"severity": "medium", |
|
|
} |
|
|
) |
|
|
break |
|
|
|
|
|
|
|
|
dengue_match = re.search(r"dengue[:\s]*(\d{1,5})\s*(?:cases?)?", page_text) |
|
|
if dengue_match: |
|
|
try: |
|
|
result["dengue"]["weekly_cases"] = int(dengue_match.group(1)) |
|
|
logger.info( |
|
|
f"[HEALTH] Found Dengue cases: {result['dengue']['weekly_cases']}" |
|
|
) |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[HEALTH] Scraping error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
if not result["alerts"]: |
|
|
current_month = utc_now().month |
|
|
if current_month in [5, 6, 10, 11, 12]: |
|
|
result["advisories"].append( |
|
|
{ |
|
|
"type": "seasonal", |
|
|
"text": "Mosquito Control: Remove stagnant water to prevent Dengue breeding.", |
|
|
"severity": "medium", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
_health_cache = result |
|
|
_health_cache_time = utc_now() |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def tool_commodity_prices() -> Dict[str, Any]: |
|
|
""" |
|
|
Get prices for essential commodities in Sri Lanka. |
|
|
|
|
|
Fetches live prices from UN World Food Programme (WFP) Humanitarian Data Exchange. |
|
|
Includes rice, sugar, lentils, eggs, chicken, coconut oil, onions, potatoes, and more. |
|
|
|
|
|
Returns: |
|
|
Dict with commodity prices, units, and source information |
|
|
""" |
|
|
global _commodity_cache, _commodity_cache_time |
|
|
|
|
|
|
|
|
if _commodity_cache_time: |
|
|
cache_age = (utc_now() - _commodity_cache_time).total_seconds() / 60 |
|
|
if cache_age < 60 and _commodity_cache: |
|
|
logger.info(f"[COMMODITY] Using cached data ({cache_age:.1f} min old)") |
|
|
return _commodity_cache |
|
|
|
|
|
logger.info("[COMMODITY] Fetching live commodity prices from WFP HDX...") |
|
|
|
|
|
|
|
|
WFP_HDX_URL = "https://data.humdata.org/dataset/0298c598-d312-4771-b564-f4ac4d831f05/resource/3638f0d6-9969-48cf-a919-1d879d037ec6/download/wfp_food_prices_lka.csv" |
|
|
|
|
|
|
|
|
COMMODITY_MAPPING = { |
|
|
"Rice (red nadu)": ("White Rice (Nadu)", "grains"), |
|
|
"Rice (white)": ("White Rice (Samba)", "grains"), |
|
|
"Rice (red)": ("Red Rice", "grains"), |
|
|
"Wheat flour": ("Wheat Flour", "grains"), |
|
|
"Sugar": ("Sugar (White)", "essentials"), |
|
|
"Lentils": ("Dhal (Lentils)", "pulses"), |
|
|
"Oil (coconut)": ("Coconut Oil", "cooking"), |
|
|
"Coconut": ("Coconut (Fresh)", "cooking"), |
|
|
"Eggs": ("Eggs (per unit)", "protein"), |
|
|
"Meat (chicken, fresh)": ("Chicken", "protein"), |
|
|
"Meat (chicken, broiler)": ("Chicken (Broiler)", "protein"), |
|
|
"Onions (imported)": ("Big Onion", "vegetables"), |
|
|
"Onions (red)": ("Red Onion", "vegetables"), |
|
|
"Potatoes (imported)": ("Potatoes", "vegetables"), |
|
|
"Potatoes (local)": ("Potatoes (Local)", "vegetables"), |
|
|
"Tomatoes": ("Tomatoes", "vegetables"), |
|
|
"Cabbage": ("Cabbage", "vegetables"), |
|
|
"Carrots": ("Carrots", "vegetables"), |
|
|
"Fuel (diesel)": ("Diesel", "fuel"), |
|
|
"Fuel (petrol-gasoline)": ("Petrol 92 Octane", "fuel"), |
|
|
} |
|
|
|
|
|
commodities = [] |
|
|
data_date = None |
|
|
source_status = "error" |
|
|
|
|
|
try: |
|
|
resp = _safe_get(WFP_HDX_URL, timeout=60) |
|
|
if resp and resp.status_code == 200: |
|
|
import csv |
|
|
import io |
|
|
from collections import defaultdict |
|
|
|
|
|
reader = csv.DictReader(io.StringIO(resp.text)) |
|
|
rows = list(reader) |
|
|
|
|
|
if rows: |
|
|
|
|
|
latest_date = max( |
|
|
row.get("date", "") for row in rows if row.get("date") |
|
|
) |
|
|
data_date = latest_date |
|
|
|
|
|
|
|
|
latest_prices: Dict[str, List[float]] = defaultdict(list) |
|
|
for row in rows: |
|
|
if row.get("date") == latest_date and row.get("price"): |
|
|
commodity = row.get("commodity", "") |
|
|
try: |
|
|
price = float(row["price"]) |
|
|
latest_prices[commodity].append(price) |
|
|
except (ValueError, KeyError): |
|
|
pass |
|
|
|
|
|
|
|
|
for wfp_name, (display_name, category) in COMMODITY_MAPPING.items(): |
|
|
if wfp_name in latest_prices and latest_prices[wfp_name]: |
|
|
avg_price = sum(latest_prices[wfp_name]) / len( |
|
|
latest_prices[wfp_name] |
|
|
) |
|
|
unit = "LKR/kg" |
|
|
if "Eggs" in display_name: |
|
|
unit = "LKR/each" |
|
|
elif "Coconut (Fresh)" in display_name: |
|
|
unit = "LKR/each" |
|
|
elif "Oil" in display_name: |
|
|
unit = "LKR/L" |
|
|
elif "Diesel" in display_name or "Petrol" in display_name: |
|
|
unit = "LKR/L" |
|
|
|
|
|
commodities.append( |
|
|
{ |
|
|
"name": display_name, |
|
|
"price": round(avg_price, 2), |
|
|
"unit": unit, |
|
|
"category": category, |
|
|
"live": True, |
|
|
"wfp_commodity": wfp_name, |
|
|
"markets_sampled": len(latest_prices[wfp_name]), |
|
|
} |
|
|
) |
|
|
|
|
|
source_status = "live" |
|
|
logger.info( |
|
|
f"[COMMODITY] ✓ Fetched {len(commodities)} live prices from WFP (data date: {latest_date})" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[COMMODITY] WFP API error: {e}") |
|
|
source_status = "error" |
|
|
|
|
|
|
|
|
if not commodities: |
|
|
logger.info("[COMMODITY] Using baseline data - WFP API unavailable") |
|
|
source_status = "baseline" |
|
|
commodities = [ |
|
|
{ |
|
|
"name": "White Rice (Nadu)", |
|
|
"price": 220, |
|
|
"unit": "LKR/kg", |
|
|
"category": "grains", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "White Rice (Samba)", |
|
|
"price": 250, |
|
|
"unit": "LKR/kg", |
|
|
"category": "grains", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Red Rice", |
|
|
"price": 240, |
|
|
"unit": "LKR/kg", |
|
|
"category": "grains", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Sugar (White)", |
|
|
"price": 240, |
|
|
"unit": "LKR/kg", |
|
|
"category": "essentials", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Dhal (Lentils)", |
|
|
"price": 380, |
|
|
"unit": "LKR/kg", |
|
|
"category": "pulses", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Coconut Oil", |
|
|
"price": 680, |
|
|
"unit": "LKR/L", |
|
|
"category": "cooking", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Eggs (per unit)", |
|
|
"price": 48, |
|
|
"unit": "LKR/each", |
|
|
"category": "protein", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Chicken", |
|
|
"price": 1350, |
|
|
"unit": "LKR/kg", |
|
|
"category": "protein", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Big Onion", |
|
|
"price": 280, |
|
|
"unit": "LKR/kg", |
|
|
"category": "vegetables", |
|
|
"live": False, |
|
|
}, |
|
|
{ |
|
|
"name": "Potatoes", |
|
|
"price": 350, |
|
|
"unit": "LKR/kg", |
|
|
"category": "vegetables", |
|
|
"live": False, |
|
|
}, |
|
|
] |
|
|
data_date = utc_now().strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
category_order = { |
|
|
"grains": 1, |
|
|
"essentials": 2, |
|
|
"pulses": 3, |
|
|
"cooking": 4, |
|
|
"protein": 5, |
|
|
"vegetables": 6, |
|
|
"fuel": 7, |
|
|
} |
|
|
commodities.sort( |
|
|
key=lambda x: (category_order.get(x.get("category", ""), 99), x.get("name", "")) |
|
|
) |
|
|
|
|
|
|
|
|
live_count = sum(1 for c in commodities if c.get("live", False)) |
|
|
result = { |
|
|
"commodities": commodities, |
|
|
"source": "UN World Food Programme (WFP) Humanitarian Data Exchange", |
|
|
"source_url": WFP_HDX_URL.replace("/download/wfp_food_prices_lka.csv", ""), |
|
|
"data_date": data_date, |
|
|
"scrape_status": source_status, |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
"summary": { |
|
|
"total_items": len(commodities), |
|
|
"items_live": live_count, |
|
|
"items_baseline": len(commodities) - live_count, |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
_commodity_cache = result |
|
|
_commodity_cache_time = utc_now() |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def tool_water_supply_alerts() -> Dict[str, Any]: |
|
|
""" |
|
|
Get water supply disruption alerts from NWSDB. |
|
|
|
|
|
Returns information about planned/unplanned water cuts and affected areas. |
|
|
|
|
|
Returns: |
|
|
Dict with active disruptions, affected areas, and restoration times |
|
|
""" |
|
|
global _water_cache, _water_cache_time |
|
|
|
|
|
|
|
|
if _water_cache_time: |
|
|
cache_age = (utc_now() - _water_cache_time).total_seconds() / 60 |
|
|
if cache_age < SA_CACHE_DURATION_MINUTES and _water_cache: |
|
|
logger.info(f"[WATER] Using cached data ({cache_age:.1f} min old)") |
|
|
return _water_cache |
|
|
|
|
|
logger.info("[WATER] Fetching water supply alerts...") |
|
|
|
|
|
result = { |
|
|
"status": "normal", |
|
|
"active_disruptions": [], |
|
|
"scheduled_maintenance": [], |
|
|
"source": "waterboard.lk / NWSDB", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
"overall_supply": "stable", |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
resp = _safe_get("https://www.waterboard.lk/", timeout=30) |
|
|
if resp: |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
|
|
|
|
|
|
for trash in soup.find_all( |
|
|
[ |
|
|
"nav", |
|
|
"header", |
|
|
"footer", |
|
|
"script", |
|
|
"style", |
|
|
"noscript", |
|
|
"iframe", |
|
|
"form", |
|
|
] |
|
|
): |
|
|
trash.decompose() |
|
|
|
|
|
|
|
|
for menu in soup.select( |
|
|
".menu, .navigation, #main-menu, .top-bar, .service-block" |
|
|
): |
|
|
menu.decompose() |
|
|
|
|
|
|
|
|
alerts_found = [] |
|
|
|
|
|
|
|
|
for marquee in soup.find_all("marquee"): |
|
|
text = marquee.get_text(separator=" ", strip=True) |
|
|
if len(text) > 10: |
|
|
alerts_found.append({"text": text, "source": "ticker"}) |
|
|
|
|
|
|
|
|
for alert in soup.select(".alert, .notice, .warning, .news-ticker"): |
|
|
text = alert.get_text(separator=" ", strip=True) |
|
|
if len(text) > 10: |
|
|
alerts_found.append({"text": text, "source": "alert_box"}) |
|
|
|
|
|
|
|
|
if not alerts_found: |
|
|
main_content = ( |
|
|
soup.select_one("main, #content, .container, body") or soup.body |
|
|
) |
|
|
if main_content: |
|
|
|
|
|
for p in main_content.find_all(["p", "div", "span"]): |
|
|
text = p.get_text(strip=True) |
|
|
if ( |
|
|
len(text) < 20 or len(text) > 300 |
|
|
): |
|
|
continue |
|
|
|
|
|
text_lower = text.lower() |
|
|
|
|
|
|
|
|
has_water = any( |
|
|
w in text_lower |
|
|
for w in [ |
|
|
"water supply", |
|
|
"water cut", |
|
|
"nwsdb", |
|
|
"water board", |
|
|
] |
|
|
) |
|
|
has_issue = any( |
|
|
w in text_lower |
|
|
for w in [ |
|
|
"interruption", |
|
|
"disruption", |
|
|
"suspended", |
|
|
"stopped", |
|
|
"low pressure", |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
is_garbage = any( |
|
|
w in text_lower |
|
|
for w in [ |
|
|
"benefits", |
|
|
"payment", |
|
|
"service without", |
|
|
"bill", |
|
|
"vision", |
|
|
"mission", |
|
|
] |
|
|
) |
|
|
|
|
|
if has_water and has_issue and not is_garbage: |
|
|
alerts_found.append( |
|
|
{"text": text, "source": "content_match"} |
|
|
) |
|
|
|
|
|
|
|
|
for item in alerts_found: |
|
|
text = item["text"] |
|
|
text_lower = text.lower() |
|
|
|
|
|
|
|
|
if any( |
|
|
w in text_lower |
|
|
for w in ["benefits", "payment", "check out", "click here"] |
|
|
): |
|
|
continue |
|
|
|
|
|
result["status"] = "disruptions_reported" |
|
|
|
|
|
|
|
|
area = "Multiple areas" |
|
|
|
|
|
area_match = re.search( |
|
|
r"(colombo|gampaha|kandy|galle|matara|jaffna|kurunegala|ratnapura|kalutara|negombo)", |
|
|
text_lower, |
|
|
re.I, |
|
|
) |
|
|
if area_match: |
|
|
area = area_match.group(1).title() |
|
|
|
|
|
|
|
|
if not any(d["details"] == text for d in result["active_disruptions"]): |
|
|
result["active_disruptions"].append( |
|
|
{ |
|
|
"area": area, |
|
|
"type": "Water Disruption", |
|
|
"details": text[:200] + ("..." if len(text) > 200 else ""), |
|
|
"severity": "medium", |
|
|
} |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"[WATER] Fetched - Disruptions: {len(result['active_disruptions'])}" |
|
|
) |
|
|
|
|
|
|
|
|
if not result["active_disruptions"]: |
|
|
result["status"] = "normal" |
|
|
result["overall_supply"] = "Normal water supply across most areas" |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"[WATER] Scraping error: {e}") |
|
|
|
|
|
result["error"] = str(e) |
|
|
|
|
|
|
|
|
_water_cache = result |
|
|
_water_cache_time = utc_now() |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def tool_dmc_alerts() -> Dict[str, Any]: |
|
|
|
|
|
url = "http://www.meteo.gov.lk/index.php?lang=en" |
|
|
resp = _safe_get(url) |
|
|
if not resp: |
|
|
return { |
|
|
"source": url, |
|
|
"alerts": ["Failed to fetch alerts from DMC."], |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
alerts: List[str] = [] |
|
|
keywords = [ |
|
|
"warning", |
|
|
"advisory", |
|
|
"alert", |
|
|
"heavy rain", |
|
|
"strong wind", |
|
|
"thunderstorm", |
|
|
"flood", |
|
|
"landslide", |
|
|
"cyclone", |
|
|
"severe", |
|
|
] |
|
|
for text in soup.find_all(string=True): |
|
|
if len(text.strip()) > 20 and any(k in text.lower() for k in keywords): |
|
|
clean = re.sub(r"\s+", " ", text.strip()) |
|
|
if clean not in alerts: |
|
|
alerts.append(clean) |
|
|
if not alerts: |
|
|
alerts = ["No active severe weather alerts detected."] |
|
|
return { |
|
|
"source": url, |
|
|
"alerts": alerts[:10], |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: |
|
|
""" |
|
|
Comprehensive Weather Scraper (Robust Mode): |
|
|
1. Homepage (General Text). |
|
|
2. City/District Forecast (Direct URL). |
|
|
3. Critical Advisory PDFs. |
|
|
Handles slow loading by capturing content even if timeouts occur. |
|
|
""" |
|
|
base_url = "https://meteo.gov.lk/" |
|
|
city_forecast_url = "https://meteo.gov.lk/index.php?option=com_content&view=article&id=102&Itemid=360&lang=en" |
|
|
|
|
|
combined_report = [] |
|
|
html_home = "" |
|
|
html_city = "" |
|
|
|
|
|
if PLAYWRIGHT_AVAILABLE: |
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch(headless=True) |
|
|
|
|
|
context = browser.new_context( |
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
|
|
) |
|
|
page = context.new_page() |
|
|
page.set_default_timeout(60000) |
|
|
|
|
|
|
|
|
try: |
|
|
page.goto(base_url, wait_until="domcontentloaded") |
|
|
|
|
|
try: |
|
|
page.wait_for_selector("div.itemFullText", timeout=15000) |
|
|
except: |
|
|
pass |
|
|
html_home = page.content() |
|
|
except Exception as e: |
|
|
|
|
|
logger.warning( |
|
|
f"[WEATHER] Homepage timeout (capturing partial): {e}" |
|
|
) |
|
|
html_home = page.content() |
|
|
|
|
|
|
|
|
try: |
|
|
page.goto(city_forecast_url, wait_until="domcontentloaded") |
|
|
try: |
|
|
page.wait_for_selector("table", timeout=15000) |
|
|
except: |
|
|
pass |
|
|
html_city = page.content() |
|
|
except Exception as e: |
|
|
logger.warning( |
|
|
f"[WEATHER] City Forecast timeout (capturing partial): {e}" |
|
|
) |
|
|
html_city = page.content() |
|
|
|
|
|
browser.close() |
|
|
except Exception as e: |
|
|
logger.warning(f"[WEATHER] Playwright critical fail: {e}") |
|
|
|
|
|
|
|
|
if not html_home or len(html_home) < 500: |
|
|
resp = _safe_get(base_url) |
|
|
html_home = resp.text if resp else "" |
|
|
|
|
|
if not html_city or len(html_city) < 500: |
|
|
resp = _safe_get(city_forecast_url) |
|
|
html_city = resp.text if resp else "" |
|
|
|
|
|
if not html_home and not html_city: |
|
|
return {"error": "Failed to load Meteo.gov.lk"} |
|
|
|
|
|
|
|
|
soup_home = BeautifulSoup(html_home, "html.parser") |
|
|
english_forecast = "" |
|
|
|
|
|
header = soup_home.find(string=re.compile(r"WEATHER FORECAST FOR", re.I)) |
|
|
if header: |
|
|
container = header.find_parent("div") or header.find_parent("article") |
|
|
if container: |
|
|
text = container.get_text(separator="\n", strip=True) |
|
|
start = text.upper().find("WEATHER FORECAST FOR") |
|
|
if start != -1: |
|
|
english_forecast = text[start:][:2500] |
|
|
|
|
|
if not english_forecast: |
|
|
main = soup_home.find("div", class_="itemFullText") or soup_home.find( |
|
|
"div", itemprop="articleBody" |
|
|
) |
|
|
english_forecast = ( |
|
|
main.get_text(separator="\n", strip=True)[:2500] |
|
|
if main |
|
|
else "General forecast text not found." |
|
|
) |
|
|
|
|
|
combined_report.append("--- ISLAND-WIDE GENERAL FORECAST ---") |
|
|
combined_report.append(english_forecast) |
|
|
|
|
|
|
|
|
if html_city: |
|
|
soup_city = BeautifulSoup(html_city, "html.parser") |
|
|
table = soup_city.find("table") |
|
|
if table: |
|
|
combined_report.append("\n--- DISTRICT/CITY FORECASTS ---") |
|
|
rows = table.find_all("tr") |
|
|
|
|
|
|
|
|
if rows: |
|
|
header_row = rows[0] |
|
|
headers = [ |
|
|
th.get_text(strip=True) for th in header_row.find_all(["th", "td"]) |
|
|
] |
|
|
if not "".join(headers).strip() and len(rows) > 1: |
|
|
headers = [ |
|
|
th.get_text(strip=True) for th in rows[1].find_all(["th", "td"]) |
|
|
] |
|
|
|
|
|
clean_header = " | ".join(headers[:4]) |
|
|
combined_report.append(clean_header) |
|
|
combined_report.append("-" * len(clean_header)) |
|
|
|
|
|
|
|
|
for row in rows: |
|
|
cols = [td.get_text(strip=True) for td in row.find_all("td")] |
|
|
if not cols or len(cols) < 2: |
|
|
continue |
|
|
if "City" in cols[0] or "Temperature" in cols[0]: |
|
|
continue |
|
|
|
|
|
row_text = " | ".join(cols[:4]) |
|
|
combined_report.append(row_text) |
|
|
|
|
|
|
|
|
pdf_links = soup_home.find_all("a", href=True) |
|
|
found_pdfs = [] |
|
|
for a in pdf_links: |
|
|
link_text = a.get_text(strip=True) |
|
|
href = a["href"] |
|
|
if "pdf" in href.lower() and any( |
|
|
k in link_text.lower() for k in ["advisory", "warning"] |
|
|
): |
|
|
abs_url = _make_absolute(href, base_url) |
|
|
if abs_url not in [p["url"] for p in found_pdfs]: |
|
|
prio = 1 if "english" in link_text.lower() else 2 |
|
|
found_pdfs.append({"title": link_text, "url": abs_url, "prio": prio}) |
|
|
|
|
|
found_pdfs.sort(key=lambda x: x["prio"]) |
|
|
|
|
|
for pdf in found_pdfs[:2]: |
|
|
text = _extract_text_from_pdf_url(pdf["url"]) |
|
|
if "Sinhala/Tamil" not in text and len(text) > 50: |
|
|
combined_report.append(f"\n--- CRITICAL ALERT: {pdf['title']} ---\n{text}") |
|
|
|
|
|
|
|
|
final_text = "\n\n".join(combined_report) |
|
|
cleanup = ["DEPARTMENT OF METEOROLOGY", "Loading...", "Listen To The Weather"] |
|
|
for c in cleanup: |
|
|
final_text = final_text.replace(c, "") |
|
|
|
|
|
return { |
|
|
"location": "All Districts", |
|
|
"forecast": final_text, |
|
|
"source": base_url, |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOCAL_NEWS_SITES = [ |
|
|
{ |
|
|
"url": "https://www.dailymirror.lk/", |
|
|
"name": "Daily Mirror", |
|
|
"article_selector": "article, .news-block, .article, .card", |
|
|
}, |
|
|
{ |
|
|
"url": "https://www.ft.lk/", |
|
|
"name": "Daily FT", |
|
|
"article_selector": "article, .article-list-item, .card", |
|
|
}, |
|
|
{ |
|
|
"url": "https://www.newsfirst.lk/", |
|
|
"name": "News First", |
|
|
"article_selector": ".post, article, .news-block", |
|
|
}, |
|
|
] |
|
|
|
|
|
|
|
|
def scrape_local_news_impl( |
|
|
keywords: Optional[List[str]] = None, |
|
|
max_articles: int = 30, |
|
|
) -> List[Dict[str, Any]]: |
|
|
results: List[Dict[str, Any]] = [] |
|
|
for site in LOCAL_NEWS_SITES: |
|
|
try: |
|
|
resp = _safe_get(site["url"]) |
|
|
if not resp: |
|
|
logger.warning(f"[NEWS] Failed to fetch {site['url']}") |
|
|
continue |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
articles = soup.select(site.get("article_selector", "article")) |
|
|
for article in articles: |
|
|
title_elem = ( |
|
|
article.find("h1") |
|
|
or article.find("h2") |
|
|
or article.find("h3") |
|
|
or article.find( |
|
|
class_=re.compile(r"(title|headline|heading)", re.I) |
|
|
) |
|
|
) |
|
|
title = title_elem.get_text(strip=True) if title_elem else "" |
|
|
if not title or len(title) < 8: |
|
|
a = article.find("a", href=True) |
|
|
title = title or (a.get_text(strip=True) if a else "") |
|
|
if not title or len(title) < 8: |
|
|
continue |
|
|
if not _contains_keyword(title, keywords): |
|
|
continue |
|
|
link_elem = article.find("a", href=True) |
|
|
href = link_elem["href"] if link_elem else site["url"] |
|
|
href = _make_absolute(href, site["url"]) |
|
|
snippet_elem = article.find("p") or article.find( |
|
|
class_=re.compile(r"(excerpt|summary|description)", re.I) |
|
|
) |
|
|
snippet = ( |
|
|
snippet_elem.get_text(strip=True)[:300] if snippet_elem else "" |
|
|
) |
|
|
results.append( |
|
|
{ |
|
|
"source": site["name"], |
|
|
"source_url": site["url"], |
|
|
"headline": title, |
|
|
"snippet": snippet, |
|
|
"url": href, |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
) |
|
|
if len(results) >= max_articles: |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"[NEWS] Error scraping {site['name']}: {e}") |
|
|
continue |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_reddit_impl( |
|
|
keywords: List[str], |
|
|
limit: int = 20, |
|
|
subreddit: Optional[str] = None, |
|
|
) -> List[Dict[str, Any]]: |
|
|
base = ( |
|
|
f"https://www.reddit.com/r/{subreddit}/search.json" |
|
|
if subreddit |
|
|
else "https://www.reddit.com/search.json" |
|
|
) |
|
|
query = " ".join(keywords) if keywords else "Sri Lanka" |
|
|
params = { |
|
|
"q": query, |
|
|
"sort": "new", |
|
|
"limit": str(limit), |
|
|
"restrict_sr": "on" if subreddit else "off", |
|
|
} |
|
|
headers = { |
|
|
"User-Agent": DEFAULT_HEADERS["User-Agent"], |
|
|
"Accept": "application/json", |
|
|
} |
|
|
try: |
|
|
resp = requests.get( |
|
|
base, headers=headers, params=params, timeout=DEFAULT_TIMEOUT |
|
|
) |
|
|
if resp.status_code != 200: |
|
|
logger.warning(f"[REDDIT] HTTP {resp.status_code} for {base}") |
|
|
return [ |
|
|
{"error": f"Reddit returned status {resp.status_code}", "query": query} |
|
|
] |
|
|
data = resp.json() |
|
|
posts_raw = data.get("data", {}).get("children", []) |
|
|
posts: List[Dict[str, Any]] = [] |
|
|
for p in posts_raw: |
|
|
d = p.get("data", {}) |
|
|
title = d.get("title") or "" |
|
|
selftext = d.get("selftext") or "" |
|
|
text = f"{title}\n{selftext}" |
|
|
if not _contains_keyword(text, keywords): |
|
|
continue |
|
|
posts.append( |
|
|
{ |
|
|
"id": d.get("id"), |
|
|
"title": title, |
|
|
"selftext": selftext[:500], |
|
|
"subreddit": d.get("subreddit"), |
|
|
"author": d.get("author"), |
|
|
"score": d.get("score", 0), |
|
|
"url": "https://www.reddit.com" + d.get("permalink", ""), |
|
|
"created_utc": d.get("created_utc"), |
|
|
"num_comments": d.get("num_comments", 0), |
|
|
} |
|
|
) |
|
|
return ( |
|
|
posts |
|
|
if posts |
|
|
else [{"note": f"No Reddit posts found for: {query}", "query": query}] |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"[REDDIT] Error: {e}") |
|
|
return [{"error": str(e), "query": query}] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _scrape_cse_website_data(symbol: str) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Scrape stock data directly from CSE website. |
|
|
This is more reliable than yfinance for Sri Lankan stocks. |
|
|
""" |
|
|
try: |
|
|
cse_url = "https://www.cse.lk/" |
|
|
resp = _safe_get(cse_url, timeout=30) |
|
|
if not resp: |
|
|
return None |
|
|
|
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
text = soup.get_text(separator="\n", strip=True) |
|
|
|
|
|
result_data = {} |
|
|
|
|
|
|
|
|
|
|
|
aspi_patterns = [ |
|
|
r"ASPI[:\s]*([\d,]+\.?\d*)\s*(?:points?)?\s*[\(\[]?([+-]?[\d,]+\.?\d*)\s*(?:points?)?[\)\]]?\s*[\(\[]?([+-]?[\d,]*\.?\d*)%?[\)\]]?", |
|
|
r"All\s*Share\s*(?:Price\s*)?Index[:\s]*([\d,]+\.?\d*)", |
|
|
r"ASPI[^\d\n\r]*([\d,]+\.\d+)", |
|
|
] |
|
|
|
|
|
for pattern in aspi_patterns: |
|
|
m = re.search(pattern, text, re.I) |
|
|
if m: |
|
|
try: |
|
|
value = float(m.group(1).replace(",", "")) |
|
|
result_data["aspi"] = { |
|
|
"value": value, |
|
|
"change": ( |
|
|
float(m.group(2).replace(",", "")) |
|
|
if len(m.groups()) > 1 and m.group(2) |
|
|
else None |
|
|
), |
|
|
"change_pct": ( |
|
|
float(m.group(3).replace(",", "").replace("%", "")) |
|
|
if len(m.groups()) > 2 and m.group(3) |
|
|
else None |
|
|
), |
|
|
} |
|
|
break |
|
|
except (ValueError, IndexError): |
|
|
continue |
|
|
|
|
|
|
|
|
sp_patterns = [ |
|
|
r"S&?P\s*SL\s*20[:\s]*([\d,]+\.?\d*)", |
|
|
r"SL20[:\s]*([\d,]+\.?\d*)", |
|
|
] |
|
|
|
|
|
for pattern in sp_patterns: |
|
|
m = re.search(pattern, text, re.I) |
|
|
if m: |
|
|
try: |
|
|
result_data["sp_sl20"] = float(m.group(1).replace(",", "")) |
|
|
break |
|
|
except ValueError: |
|
|
continue |
|
|
|
|
|
|
|
|
if result_data: |
|
|
return result_data |
|
|
|
|
|
|
|
|
m = re.search( |
|
|
r"(ASPI|All Share Price Index)[^\d\n\r]*([\d,]+\.\d+)", text, re.I |
|
|
) |
|
|
if m: |
|
|
return {"aspi": {"value": float(m.group(2).replace(",", ""))}} |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"[CSE] Direct CSE scrape failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def scrape_cse_stock_impl( |
|
|
symbol: str = "ASPI", |
|
|
period: str = "1d", |
|
|
interval: str = "1h", |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch CSE stock data with multiple fallback strategies: |
|
|
1. First try direct CSE website scraping (most reliable for Sri Lankan stocks) |
|
|
2. Fall back to yfinance if direct scraping fails |
|
|
|
|
|
Note: yfinance often fails for CSE symbols as Yahoo Finance has limited |
|
|
coverage of the Colombo Stock Exchange. |
|
|
""" |
|
|
symbol_upper = symbol.upper() |
|
|
is_index = symbol_upper in ("ASPI", "ASPI.N0000", "^N0000", "ALL SHARE") |
|
|
|
|
|
|
|
|
|
|
|
if is_index: |
|
|
logger.info(f"[CSE] Attempting direct CSE website scrape for {symbol}...") |
|
|
cse_data = _scrape_cse_website_data(symbol) |
|
|
|
|
|
if cse_data and "aspi" in cse_data: |
|
|
aspi_info = cse_data["aspi"] |
|
|
summary = { |
|
|
"current_price": aspi_info.get("value", 0), |
|
|
"change": aspi_info.get("change"), |
|
|
"change_pct": aspi_info.get("change_pct"), |
|
|
} |
|
|
|
|
|
|
|
|
if "sp_sl20" in cse_data: |
|
|
summary["sp_sl20"] = cse_data["sp_sl20"] |
|
|
|
|
|
logger.info( |
|
|
f"[CSE] Successfully scraped ASPI from CSE website: {summary['current_price']}" |
|
|
) |
|
|
return { |
|
|
"symbol": symbol, |
|
|
"resolved_symbol": "CSE-direct", |
|
|
"period": period, |
|
|
"interval": interval, |
|
|
"summary": summary, |
|
|
"records": [], |
|
|
"source": "cse.lk (direct scrape)", |
|
|
"note": "Real-time data from Colombo Stock Exchange website", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
symbols_to_try = [symbol] |
|
|
if is_index: |
|
|
symbols_to_try = ["^N0000", "ASPI.N0000", "ASPI"] |
|
|
elif not symbol.endswith(".N0000") and not symbol.startswith("^"): |
|
|
|
|
|
symbols_to_try = [f"{symbol}.N0000", symbol] |
|
|
|
|
|
logger.info(f"[CSE] Trying yfinance for symbols: {symbols_to_try}") |
|
|
|
|
|
for sym in symbols_to_try: |
|
|
try: |
|
|
ticker = yf.Ticker(sym) |
|
|
hist = ticker.history(period=period, interval=interval) |
|
|
|
|
|
if hist is None or hist.empty: |
|
|
logger.debug(f"[CSE] yfinance returned empty data for {sym}") |
|
|
continue |
|
|
|
|
|
hist = hist.reset_index() |
|
|
records = hist.to_dict(orient="records") |
|
|
|
|
|
for record in records: |
|
|
for key, value in list(record.items()): |
|
|
if hasattr(value, "isoformat"): |
|
|
record[key] = value.isoformat() |
|
|
|
|
|
latest = records[-1] if records else {} |
|
|
summary = { |
|
|
"current_price": latest.get("Close", latest.get("close", 0)), |
|
|
"open": latest.get("Open", latest.get("open", 0)), |
|
|
"high": latest.get("High", latest.get("high", 0)), |
|
|
"low": latest.get("Low", latest.get("low", 0)), |
|
|
"volume": latest.get("Volume", latest.get("volume", 0)), |
|
|
} |
|
|
|
|
|
logger.info(f"[CSE] yfinance success for {sym}: {summary['current_price']}") |
|
|
return { |
|
|
"symbol": symbol, |
|
|
"resolved_symbol": sym, |
|
|
"period": period, |
|
|
"interval": interval, |
|
|
"summary": summary, |
|
|
"records": records[-10:], |
|
|
"source": "yahoo_finance", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
except Exception as e_inner: |
|
|
logger.debug(f"[CSE] yfinance attempt failed for {sym}: {e_inner}") |
|
|
continue |
|
|
|
|
|
|
|
|
logger.info("[CSE] All yfinance attempts failed, trying CSE website fallback...") |
|
|
cse_data = _scrape_cse_website_data(symbol) |
|
|
|
|
|
if cse_data and "aspi" in cse_data: |
|
|
return { |
|
|
"symbol": symbol, |
|
|
"resolved_symbol": "CSE-fallback", |
|
|
"period": period, |
|
|
"interval": interval, |
|
|
"summary": {"current_price": cse_data["aspi"].get("value", 0)}, |
|
|
"records": [], |
|
|
"source": "cse.lk (fallback scrape)", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
logger.warning(f"[CSE] All data sources failed for {symbol}") |
|
|
return { |
|
|
"symbol": symbol, |
|
|
"error": f"Could not fetch data for {symbol}. Yahoo Finance has limited CSE coverage.", |
|
|
"attempted_symbols": symbols_to_try, |
|
|
"suggestion": "Try accessing cse.lk directly for real-time CSE data", |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_government_gazette_impl( |
|
|
keywords: Optional[List[str]] = None, |
|
|
max_items: int = 15, |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Scrapes gazette.lk for latest government gazettes. |
|
|
ENHANCED: Now downloads PDFs and extracts text content from them. |
|
|
|
|
|
Args: |
|
|
keywords: Optional list of keywords to filter gazettes (currently ignored) |
|
|
max_items: Maximum number of gazette entries to process |
|
|
|
|
|
Returns: |
|
|
List of gazette entries with PDF content extracted |
|
|
""" |
|
|
base_url = "https://www.gazette.lk/government-gazette" |
|
|
results: List[Dict[str, Any]] = [] |
|
|
|
|
|
logger.info(f"[GAZETTE] Fetching latest gazettes from {base_url}") |
|
|
resp = _safe_get(base_url) |
|
|
if not resp: |
|
|
return [ |
|
|
{ |
|
|
"title": "Failed to access gazette.lk", |
|
|
"url": base_url, |
|
|
"error": "Network request failed", |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
] |
|
|
|
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
|
|
|
|
|
|
articles = soup.find_all("article") |
|
|
if not articles: |
|
|
articles = soup.select(".post, .type-post, .entry") |
|
|
|
|
|
logger.info(f"[GAZETTE] Found {len(articles)} potential gazette entries") |
|
|
|
|
|
for article in articles: |
|
|
if len(results) >= max_items: |
|
|
break |
|
|
|
|
|
|
|
|
title_elem = ( |
|
|
article.find(class_="entry-title") |
|
|
or article.find("h2") |
|
|
or article.find("h3") |
|
|
) |
|
|
if not title_elem: |
|
|
continue |
|
|
|
|
|
link_elem = title_elem.find("a", href=True) |
|
|
if not link_elem: |
|
|
continue |
|
|
|
|
|
title = link_elem.get_text(strip=True) |
|
|
post_url = link_elem["href"] |
|
|
post_url_abs = _make_absolute(post_url, base_url) |
|
|
|
|
|
|
|
|
if "government gazette" not in title.lower(): |
|
|
continue |
|
|
|
|
|
|
|
|
date_match = re.search(r"(\d{4}\s+\w+\s+\d{1,2})", title) |
|
|
date_str = date_match.group(1) if date_match else "Unknown date" |
|
|
|
|
|
logger.info(f"[GAZETTE] Processing: {title[:50]}...") |
|
|
|
|
|
|
|
|
pdf_links = [] |
|
|
pdf_content = [] |
|
|
|
|
|
try: |
|
|
detail_resp = _safe_get(post_url_abs) |
|
|
if detail_resp: |
|
|
detail_soup = BeautifulSoup(detail_resp.text, "html.parser") |
|
|
|
|
|
|
|
|
|
|
|
pdfemb_links = detail_soup.find_all("a", class_="pdfemb-viewer") |
|
|
for link in pdfemb_links: |
|
|
href = link.get("href", "") |
|
|
if href and ("/dl/Gazette/" in href or ".pdf" in href.lower()): |
|
|
|
|
|
language = "english" |
|
|
href_lower = href.lower() |
|
|
if href.endswith("S.pdf") or "sinhala" in href_lower: |
|
|
language = "sinhala" |
|
|
elif href.endswith("T.pdf") or "tamil" in href_lower: |
|
|
language = "tamil" |
|
|
|
|
|
pdf_url = _make_absolute(href, post_url_abs) |
|
|
pdf_links.append( |
|
|
{ |
|
|
"language": language, |
|
|
"url": pdf_url, |
|
|
"text": link.get_text(strip=True) |
|
|
or f"Gazette PDF ({language})", |
|
|
} |
|
|
) |
|
|
logger.info(f"[GAZETTE] Found pdfemb-viewer link: {pdf_url}") |
|
|
|
|
|
|
|
|
if not pdf_links: |
|
|
for link in detail_soup.find_all("a", href=True): |
|
|
href = link["href"] |
|
|
link_text = link.get_text(strip=True).lower() |
|
|
|
|
|
|
|
|
is_gazette_pdf = "/dl/Gazette/" in href |
|
|
is_pdf_file = href.lower().endswith(".pdf") |
|
|
|
|
|
if is_gazette_pdf or is_pdf_file: |
|
|
pdf_url = _make_absolute(href, post_url_abs) |
|
|
|
|
|
|
|
|
language = "english" |
|
|
if "sinhala" in link_text or href.endswith("S.pdf"): |
|
|
language = "sinhala" |
|
|
elif "tamil" in link_text or href.endswith("T.pdf"): |
|
|
language = "tamil" |
|
|
elif href.endswith("E.pdf") or "english" in link_text: |
|
|
language = "english" |
|
|
|
|
|
|
|
|
if not any(p["url"] == pdf_url for p in pdf_links): |
|
|
pdf_links.append( |
|
|
{ |
|
|
"language": language, |
|
|
"url": pdf_url, |
|
|
"text": link.get_text(strip=True) |
|
|
or f"PDF ({language})", |
|
|
} |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"[GAZETTE] Found {len(pdf_links)} PDF links on detail page" |
|
|
) |
|
|
|
|
|
|
|
|
english_pdfs = [p for p in pdf_links if p["language"] == "english"] |
|
|
if not english_pdfs: |
|
|
english_pdfs = pdf_links[:1] |
|
|
|
|
|
for pdf_info in english_pdfs[:2]: |
|
|
try: |
|
|
logger.info( |
|
|
f"[GAZETTE] Downloading PDF: {pdf_info['url'][:60]}..." |
|
|
) |
|
|
extracted_text = _extract_text_from_pdf_url(pdf_info["url"]) |
|
|
|
|
|
if extracted_text and not extracted_text.startswith("["): |
|
|
pdf_content.append( |
|
|
{ |
|
|
"language": pdf_info["language"], |
|
|
"content": extracted_text, |
|
|
"source_url": pdf_info["url"], |
|
|
} |
|
|
) |
|
|
logger.info( |
|
|
f"[GAZETTE] Extracted {len(extracted_text)} chars from PDF" |
|
|
) |
|
|
else: |
|
|
pdf_content.append( |
|
|
{ |
|
|
"language": pdf_info["language"], |
|
|
"content": extracted_text, |
|
|
"source_url": pdf_info["url"], |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"[GAZETTE] PDF extraction error: {e}") |
|
|
pdf_content.append( |
|
|
{ |
|
|
"language": pdf_info.get("language", "unknown"), |
|
|
"content": f"[Error extracting PDF: {str(e)}]", |
|
|
"source_url": pdf_info.get("url", ""), |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"[GAZETTE] Error fetching detail page: {e}") |
|
|
|
|
|
|
|
|
result_entry = { |
|
|
"title": title, |
|
|
"date": date_str, |
|
|
"url": post_url_abs, |
|
|
"pdf_links": pdf_links, |
|
|
"extracted_content": pdf_content, |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
if pdf_content: |
|
|
first_content = pdf_content[0].get("content", "") |
|
|
if first_content and not first_content.startswith("["): |
|
|
result_entry["summary"] = first_content[:500] |
|
|
|
|
|
results.append(result_entry) |
|
|
logger.info(f"[GAZETTE] Added gazette with {len(pdf_content)} PDF extractions") |
|
|
|
|
|
if not results: |
|
|
return [ |
|
|
{ |
|
|
"title": "No gazette entries found", |
|
|
"url": base_url, |
|
|
"note": "The website structure may have changed", |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
] |
|
|
|
|
|
logger.info( |
|
|
f"[GAZETTE] Successfully scraped {len(results)} gazette entries with PDF content" |
|
|
) |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_parliament_minutes_impl( |
|
|
keywords: Optional[List[str]] = None, |
|
|
max_items: int = 20, |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Scrape Sri Lankan Parliament Hansards from parliament.lk. |
|
|
|
|
|
ENHANCED: Now properly extracts Hansard PDF links with dates and metadata. |
|
|
The website stores PDFs at /uploads/businessdocs/ with date-encoded filenames. |
|
|
|
|
|
Args: |
|
|
keywords: Optional keywords to filter results |
|
|
max_items: Maximum number of items to return |
|
|
|
|
|
Returns: |
|
|
List of Hansard entries with PDF links and dates |
|
|
""" |
|
|
url = "https://www.parliament.lk/en/business-of-parliament/hansards" |
|
|
|
|
|
logger.info(f"[PARLIAMENT] Fetching Hansards from {url}") |
|
|
resp = _safe_get(url) |
|
|
|
|
|
if not resp: |
|
|
return [ |
|
|
{ |
|
|
"title": "Parliament website unavailable", |
|
|
"url": url, |
|
|
"note": "Could not access parliament.lk. Site may be down.", |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
] |
|
|
|
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
results: List[Dict[str, Any]] = [] |
|
|
|
|
|
|
|
|
pdf_links = soup.find_all( |
|
|
"a", href=lambda x: x and ".pdf" in x.lower() and "businessdocs" in x.lower() |
|
|
) |
|
|
|
|
|
logger.info(f"[PARLIAMENT] Found {len(pdf_links)} Hansard PDF links") |
|
|
|
|
|
for link in pdf_links: |
|
|
href = link.get("href", "") |
|
|
link_text = link.get_text(strip=True) |
|
|
|
|
|
|
|
|
date_match = re.search(r"(\d{4}-\d{2}-\d{2})", href) |
|
|
date_str = date_match.group(1) if date_match else None |
|
|
|
|
|
|
|
|
language = "english" |
|
|
href_lower = href.lower() |
|
|
if "sinhala" in href_lower: |
|
|
language = "sinhala" |
|
|
elif "tamil" in href_lower: |
|
|
language = "tamil" |
|
|
|
|
|
|
|
|
doc_id_match = re.search(r"/(\d+)_", href) |
|
|
doc_id = doc_id_match.group(1) if doc_id_match else None |
|
|
|
|
|
|
|
|
if date_str: |
|
|
title = f"Hansard - {date_str} ({language.capitalize()})" |
|
|
else: |
|
|
title = f"Hansard ({language.capitalize()})" |
|
|
|
|
|
|
|
|
parent = link.find_parent(["tr", "li", "div", "article"]) |
|
|
if parent: |
|
|
parent_text = parent.get_text(separator=" ", strip=True) |
|
|
|
|
|
session_match = re.search( |
|
|
r"(Session|Sitting|Day)\s*[:\-]?\s*(\d+)", parent_text, re.I |
|
|
) |
|
|
if session_match: |
|
|
title += f" - {session_match.group(0)}" |
|
|
|
|
|
|
|
|
full_text = f"{title} {href} {link_text}" |
|
|
if keywords and not _contains_keyword(full_text, keywords): |
|
|
continue |
|
|
|
|
|
|
|
|
pdf_url = _make_absolute(href, url) |
|
|
|
|
|
entry = { |
|
|
"title": title, |
|
|
"url": pdf_url, |
|
|
"date": date_str, |
|
|
"language": language, |
|
|
"document_id": doc_id, |
|
|
"link_text": link_text, |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
if not any(r.get("url") == pdf_url for r in results): |
|
|
results.append(entry) |
|
|
|
|
|
if len(results) >= max_items: |
|
|
break |
|
|
|
|
|
|
|
|
if not results: |
|
|
logger.info("[PARLIAMENT] No PDF links found, trying general link search...") |
|
|
for a in soup.find_all("a", href=True): |
|
|
title = a.get_text(strip=True) |
|
|
href = a["href"] |
|
|
|
|
|
if not title or len(title) < 6: |
|
|
continue |
|
|
|
|
|
|
|
|
combined = f"{title} {href}".lower() |
|
|
if not re.search( |
|
|
r"(hansard|minutes|debate|transcript|proceedings)", combined |
|
|
): |
|
|
continue |
|
|
|
|
|
|
|
|
if keywords and not _contains_keyword(title, keywords): |
|
|
continue |
|
|
|
|
|
href_abs = _make_absolute(href, url) |
|
|
|
|
|
|
|
|
if any(r.get("url") == href_abs for r in results): |
|
|
continue |
|
|
|
|
|
results.append( |
|
|
{ |
|
|
"title": title, |
|
|
"url": href_abs, |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
) |
|
|
|
|
|
if len(results) >= max_items: |
|
|
break |
|
|
|
|
|
if not results: |
|
|
return [ |
|
|
{ |
|
|
"title": "No parliament Hansards found", |
|
|
"url": url, |
|
|
"keywords": keywords, |
|
|
"note": "The website structure may have changed or no matching documents found.", |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
] |
|
|
|
|
|
logger.info(f"[PARLIAMENT] Successfully scraped {len(results)} Hansard entries") |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_train_schedule_impl( |
|
|
from_station: Optional[str] = None, |
|
|
to_station: Optional[str] = None, |
|
|
keyword: Optional[str] = None, |
|
|
max_items: int = 30, |
|
|
) -> List[Dict[str, Any]]: |
|
|
url = "https://eservices.railway.gov.lk/schedule/homeAction.action?lang=en" |
|
|
resp = _safe_get(url) |
|
|
if not resp: |
|
|
return [ |
|
|
{ |
|
|
"train": "Railway website unavailable", |
|
|
"note": "Could not access railway.gov.lk", |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
] |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
tables = soup.find_all("table") |
|
|
results: List[Dict[str, Any]] = [] |
|
|
for table in tables: |
|
|
rows = table.find_all("tr") |
|
|
for row in rows[1:]: |
|
|
cols = [td.get_text(strip=True) for td in row.find_all("td")] |
|
|
if len(cols) < 2: |
|
|
continue |
|
|
train_info = { |
|
|
"train": cols[0] if len(cols) > 0 else "", |
|
|
"departure": cols[1] if len(cols) > 1 else "", |
|
|
"arrival": cols[2] if len(cols) > 2 else "", |
|
|
"route": " → ".join(cols[3:]) if len(cols) > 3 else "", |
|
|
} |
|
|
combined = " ".join(cols) |
|
|
if from_station and from_station.lower() not in combined.lower(): |
|
|
continue |
|
|
if to_station and to_station.lower() not in combined.lower(): |
|
|
continue |
|
|
if keyword and keyword.lower() not in combined.lower(): |
|
|
continue |
|
|
results.append(train_info) |
|
|
if len(results) >= max_items: |
|
|
break |
|
|
if not results: |
|
|
return [ |
|
|
{ |
|
|
"train": "No train schedules found", |
|
|
"note": "Railway schedule unavailable or no matches", |
|
|
"timestamp": utc_now().isoformat(), |
|
|
} |
|
|
] |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _scrape_twitter_trending_with_playwright( |
|
|
storage_state_path: Optional[str] = None, headless: bool = True |
|
|
) -> List[Dict[str, Any]]: |
|
|
ensure_playwright() |
|
|
trending = [] |
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch(headless=headless) |
|
|
context_args = {} |
|
|
if storage_state_path and os.path.exists(storage_state_path): |
|
|
context_args["storage_state"] = storage_state_path |
|
|
|
|
|
context = browser.new_context(**context_args) |
|
|
page = context.new_page() |
|
|
try: |
|
|
page.goto( |
|
|
"https://twitter.com/i/trends", wait_until="networkidle", timeout=30000 |
|
|
) |
|
|
if "login" in page.url or page.content().strip() == "": |
|
|
page.goto( |
|
|
"https://twitter.com/explore/tabs/trending", |
|
|
wait_until="networkidle", |
|
|
timeout=30000, |
|
|
) |
|
|
html = page.content() |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
items = soup.select( |
|
|
"div[role='article'] a, div[data-testid='trend'], div.trend-card, span.trend-name" |
|
|
) |
|
|
seen = set() |
|
|
for it in items: |
|
|
text = it.get_text(separator=" ", strip=True) |
|
|
href = it.get("href") or "" |
|
|
if not text or len(text) < 2: |
|
|
continue |
|
|
if text in seen: |
|
|
continue |
|
|
seen.add(text) |
|
|
trending.append( |
|
|
{ |
|
|
"trend": text, |
|
|
"url": ( |
|
|
_make_absolute(href, "https://twitter.com") |
|
|
if href |
|
|
else None |
|
|
), |
|
|
} |
|
|
) |
|
|
|
|
|
if not trending: |
|
|
for tag in soup.find_all(string=re.compile(r"#\w+")): |
|
|
t = tag.strip() |
|
|
if t not in seen: |
|
|
trending.append({"trend": t, "url": None}) |
|
|
seen.add(t) |
|
|
return trending |
|
|
except Exception as e: |
|
|
logger.error(f"[TWITTER] Playwright trending error: {e}") |
|
|
return [] |
|
|
finally: |
|
|
try: |
|
|
context.close() |
|
|
except Exception: |
|
|
pass |
|
|
browser.close() |
|
|
|
|
|
|
|
|
def _scrape_twitter_trending_with_nitter( |
|
|
instance: str = "https://nitter.net", |
|
|
) -> List[Dict[str, Any]]: |
|
|
trends = [] |
|
|
try: |
|
|
search_url = f"{instance}/search?f=tweets&q=Sri%20Lanka%20trend" |
|
|
resp = _safe_get(search_url) |
|
|
if not resp: |
|
|
return [] |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
for a in soup.select("a:not([href^='/pic/'])"): |
|
|
text = a.get_text(separator=" ", strip=True) |
|
|
href = a.get("href", "") |
|
|
if not text: |
|
|
continue |
|
|
if len(text) < 3: |
|
|
continue |
|
|
trends.append({"trend": text, "url": _make_absolute(href, instance)}) |
|
|
return trends[:20] |
|
|
except Exception as e: |
|
|
logger.debug(f"[TWITTER] Nitter fallback failed: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def scrape_twitter_trending_srilanka( |
|
|
use_playwright: bool = True, storage_state_site: Optional[str] = None |
|
|
) -> Dict[str, Any]: |
|
|
if use_playwright and PLAYWRIGHT_AVAILABLE: |
|
|
storage_state = None |
|
|
if storage_state_site: |
|
|
storage_state = load_playwright_storage_state_path(storage_state_site) |
|
|
try: |
|
|
trends = _scrape_twitter_trending_with_playwright( |
|
|
storage_state_path=storage_state |
|
|
) |
|
|
if trends: |
|
|
return { |
|
|
"source": "twitter_playwright", |
|
|
"trends": trends, |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
except Exception as e: |
|
|
logger.debug(f"[TWITTER] Playwright attempt failed: {e}") |
|
|
|
|
|
nitter_instances = [ |
|
|
"https://nitter.net", |
|
|
"https://nitter.snopyta.org", |
|
|
"https://nitter.1d4.us", |
|
|
] |
|
|
for inst in nitter_instances: |
|
|
try: |
|
|
trends = _scrape_twitter_trending_with_nitter(inst) |
|
|
if trends: |
|
|
return { |
|
|
"source": inst, |
|
|
"trends": trends, |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
} |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
return { |
|
|
"source": "none", |
|
|
"trends": [], |
|
|
"note": "Could not fetch Twitter trends. Try supplying Playwright session or check network.", |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_authenticated_page_via_playwright( |
|
|
site_name: str, |
|
|
url: str, |
|
|
login_flow: Optional[dict] = None, |
|
|
headless: bool = True, |
|
|
storage_dir: str = ".sessions", |
|
|
wait_until: str = "networkidle", |
|
|
) -> Dict[str, Any]: |
|
|
if not PLAYWRIGHT_AVAILABLE: |
|
|
return { |
|
|
"error": "Playwright not available. Install playwright to use authenticated scrapers." |
|
|
} |
|
|
|
|
|
session_path = load_playwright_storage_state_path(site_name, storage_dir) |
|
|
|
|
|
if not session_path: |
|
|
if not login_flow: |
|
|
return { |
|
|
"error": f"No existing session found for {site_name} and no login_flow provided to create one." |
|
|
} |
|
|
try: |
|
|
session_path = create_or_restore_playwright_session( |
|
|
site_name, |
|
|
login_flow=login_flow, |
|
|
headless=headless, |
|
|
storage_dir=storage_dir, |
|
|
wait_until=wait_until, |
|
|
) |
|
|
except Exception as e: |
|
|
return {"error": f"Failed to create Playwright session: {e}"} |
|
|
|
|
|
html = playwright_fetch_html_using_session( |
|
|
url, session_path, headless=headless, wait_until=wait_until |
|
|
) |
|
|
if not html: |
|
|
return { |
|
|
"error": "Failed to fetch page via Playwright session.", |
|
|
"storage_state": session_path, |
|
|
} |
|
|
return {"html": html, "source": url, "storage_state": session_path} |
|
|
|
|
|
|
|
|
def _simple_parse_posts_from_html( |
|
|
html: str, base_url: str, max_items: int = 10 |
|
|
) -> List[Dict[str, Any]]: |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
items: List[Dict[str, Any]] = [] |
|
|
candidates = soup.select( |
|
|
"article, div.post, div.feed-item, li.stream-item, div._4ikz" |
|
|
) |
|
|
if not candidates: |
|
|
candidates = soup.find_all(["article", "div"], limit=200) |
|
|
seen = set() |
|
|
for c in candidates: |
|
|
title_tag = c.find("h1") or c.find("h2") or c.find("h3") or c.find("a") |
|
|
if not title_tag: |
|
|
continue |
|
|
title = title_tag.get_text(strip=True) |
|
|
if not title or title in seen or len(title) < 4: |
|
|
continue |
|
|
seen.add(title) |
|
|
a = c.find("a", href=True) |
|
|
url = _make_absolute(a["href"], base_url) if a else base_url |
|
|
text = c.get_text(separator=" ", strip=True)[:500] |
|
|
items.append({"title": title, "snippet": text, "url": url}) |
|
|
if len(items) >= max_items: |
|
|
break |
|
|
return items |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_linkedin_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"…\s*see more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"See translation", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"\b\d+[dwmo]\s*•\s*(Edited)?\s*•?", "", text) |
|
|
text = re.sub(r".+posted this", "", text) |
|
|
text = re.sub(r"\d+[\.,]?\d*\s*reactions", "", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_linkedin(keywords: Optional[List[str]] = None, max_items: int = 10): |
|
|
""" |
|
|
LinkedIn search using Playwright session. |
|
|
Requires environment variables: LINKEDIN_USER, LINKEDIN_PASSWORD (if creating session). |
|
|
""" |
|
|
ensure_playwright() |
|
|
|
|
|
|
|
|
site = "linkedin" |
|
|
session_path = load_playwright_storage_state_path( |
|
|
site, out_dir="src/utils/.sessions" |
|
|
) |
|
|
if not session_path: |
|
|
session_path = load_playwright_storage_state_path(site, out_dir=".sessions") |
|
|
|
|
|
|
|
|
if not session_path: |
|
|
login_flow = { |
|
|
"login_url": "https://www.linkedin.com/login", |
|
|
"steps": [ |
|
|
{ |
|
|
"type": "fill", |
|
|
"selector": 'input[name="session_key"]', |
|
|
"value_env": "LINKEDIN_USER", |
|
|
}, |
|
|
{ |
|
|
"type": "fill", |
|
|
"selector": 'input[name="session_password"]', |
|
|
"value_env": "LINKEDIN_PASSWORD", |
|
|
}, |
|
|
{"type": "click", "selector": 'button[type="submit"]'}, |
|
|
{"type": "wait", "selector": "nav", "timeout": 20000}, |
|
|
], |
|
|
} |
|
|
try: |
|
|
session_path = create_or_restore_playwright_session( |
|
|
site, login_flow=login_flow, headless=True |
|
|
) |
|
|
except Exception as e: |
|
|
return json.dumps( |
|
|
{"error": f"No session found and failed to create one: {e}"} |
|
|
) |
|
|
|
|
|
keyword = " ".join(keywords) if keywords else "Sri Lanka" |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
desktop_ua = ( |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) " |
|
|
"Chrome/120.0.0.0 Safari/537.36" |
|
|
) |
|
|
|
|
|
browser = p.chromium.launch( |
|
|
headless=True, |
|
|
args=[ |
|
|
"--disable-blink-features=AutomationControlled", |
|
|
"--start-maximized", |
|
|
], |
|
|
) |
|
|
|
|
|
context = browser.new_context( |
|
|
storage_state=session_path, user_agent=desktop_ua, no_viewport=True |
|
|
) |
|
|
|
|
|
page = context.new_page() |
|
|
page.add_init_script( |
|
|
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" |
|
|
) |
|
|
|
|
|
url = f"https://www.linkedin.com/search/results/content/?keywords={keyword.replace(' ', '%20')}&origin=GLOBAL_SEARCH_HEADER" |
|
|
|
|
|
try: |
|
|
logger.info(f"[LINKEDIN] Navigating to {url}") |
|
|
page.goto(url, timeout=60000, wait_until="domcontentloaded") |
|
|
except Exception as e: |
|
|
logger.warning( |
|
|
f"[LINKEDIN] Page load timed out (or other error), attempting to proceed: {e}" |
|
|
) |
|
|
|
|
|
page.wait_for_timeout(random.randint(4000, 7000)) |
|
|
|
|
|
try: |
|
|
if ( |
|
|
page.locator("a[href*='login']").is_visible() |
|
|
or "auth_wall" in page.url |
|
|
): |
|
|
logger.error( |
|
|
"[LINKEDIN] Session invalid. Redirected to login/auth wall." |
|
|
) |
|
|
return json.dumps( |
|
|
{"error": "Session invalid. Please refresh session."} |
|
|
) |
|
|
except: |
|
|
pass |
|
|
|
|
|
seen = set() |
|
|
no_new_data_count = 0 |
|
|
previous_height = 0 |
|
|
|
|
|
POST_CONTAINER_SELECTOR = "div.feed-shared-update-v2, li.artdeco-card" |
|
|
TEXT_SELECTOR = ( |
|
|
"div.update-components-text span.break-words, span.break-words" |
|
|
) |
|
|
SEE_MORE_SELECTOR = ( |
|
|
"button.feed-shared-inline-show-more-text__see-more-less-toggle" |
|
|
) |
|
|
POSTER_SELECTOR = "span.update-components-actor__name span[dir='ltr']" |
|
|
|
|
|
while len(results) < max_items: |
|
|
try: |
|
|
see_more_buttons = page.locator(SEE_MORE_SELECTOR).all() |
|
|
for btn in see_more_buttons: |
|
|
if btn.is_visible(): |
|
|
try: |
|
|
btn.click(timeout=500) |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
if len(results) == 0: |
|
|
try: |
|
|
page.locator(POST_CONTAINER_SELECTOR).first.wait_for( |
|
|
timeout=5000 |
|
|
) |
|
|
except: |
|
|
logger.warning("[LINKEDIN] No posts found on page yet.") |
|
|
|
|
|
posts = page.locator(POST_CONTAINER_SELECTOR).all() |
|
|
|
|
|
for post in posts: |
|
|
if len(results) >= max_items: |
|
|
break |
|
|
try: |
|
|
post.scroll_into_view_if_needed() |
|
|
raw_text = "" |
|
|
text_el = post.locator(TEXT_SELECTOR).first |
|
|
if text_el.is_visible(): |
|
|
raw_text = text_el.inner_text() |
|
|
else: |
|
|
raw_text = post.locator( |
|
|
"div.feed-shared-update-v2__description-wrapper" |
|
|
).first.inner_text() |
|
|
|
|
|
cleaned_text = clean_linkedin_text(raw_text) |
|
|
poster_name = "(Unknown)" |
|
|
poster_el = post.locator(POSTER_SELECTOR).first |
|
|
if poster_el.is_visible(): |
|
|
poster_name = poster_el.inner_text().strip() |
|
|
else: |
|
|
poster_el = post.locator( |
|
|
"span.update-components-actor__title span[dir='ltr']" |
|
|
).first |
|
|
if poster_el.is_visible(): |
|
|
poster_name = poster_el.inner_text().strip() |
|
|
|
|
|
key = f"{poster_name[:20]}::{cleaned_text[:30]}" |
|
|
if cleaned_text and len(cleaned_text) > 20 and key not in seen: |
|
|
seen.add(key) |
|
|
results.append( |
|
|
{ |
|
|
"source": "LinkedIn", |
|
|
"poster": poster_name, |
|
|
"text": cleaned_text, |
|
|
"url": "https://www.linkedin.com", |
|
|
} |
|
|
) |
|
|
logger.info(f"[LINKEDIN] Found post by {poster_name}") |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
page.evaluate("window.scrollTo(0, document.body.scrollHeight)") |
|
|
page.wait_for_timeout(random.randint(2000, 4000)) |
|
|
|
|
|
new_height = page.evaluate("document.body.scrollHeight") |
|
|
if new_height == previous_height: |
|
|
no_new_data_count += 1 |
|
|
if no_new_data_count > 3: |
|
|
logger.info("[LINKEDIN] End of feed or stuck.") |
|
|
break |
|
|
else: |
|
|
no_new_data_count = 0 |
|
|
previous_height = new_height |
|
|
|
|
|
browser.close() |
|
|
return json.dumps( |
|
|
{"site": "LinkedIn", "results": results, "storage_state": session_path}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_twitter_text(text): |
|
|
"""Clean and normalize tweet text""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"Show more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"https://t\.co/\w+", "", text) |
|
|
text = re.sub(r"pic\.twitter\.com/\w+", "", text) |
|
|
text = re.sub(r"\s+", " ", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def extract_twitter_timestamp(tweet_element): |
|
|
"""Extract timestamp from tweet element""" |
|
|
try: |
|
|
timestamp_selectors = [ |
|
|
"time", |
|
|
"[datetime]", |
|
|
"a[href*='/status/'] time", |
|
|
"div[data-testid='User-Name'] a[href*='/status/']", |
|
|
] |
|
|
|
|
|
for selector in timestamp_selectors: |
|
|
if tweet_element.locator(selector).count() > 0: |
|
|
time_element = tweet_element.locator(selector).first |
|
|
datetime_attr = time_element.get_attribute("datetime") |
|
|
if datetime_attr: |
|
|
return datetime_attr |
|
|
time_text = time_element.inner_text() |
|
|
if time_text: |
|
|
return time_text |
|
|
except: |
|
|
pass |
|
|
return "Unknown" |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): |
|
|
""" |
|
|
Twitter scraper - extracts actual tweet text, author, and metadata using Playwright session. |
|
|
Requires a valid Twitter session file (twitter_storage_state.json or tw_state.json). |
|
|
""" |
|
|
ensure_playwright() |
|
|
|
|
|
|
|
|
site = "twitter" |
|
|
session_path = load_playwright_storage_state_path( |
|
|
site, out_dir="src/utils/.sessions" |
|
|
) |
|
|
if not session_path: |
|
|
session_path = load_playwright_storage_state_path(site, out_dir=".sessions") |
|
|
|
|
|
|
|
|
if not session_path: |
|
|
alt_paths = [ |
|
|
os.path.join(os.getcwd(), "src", "utils", ".sessions", "tw_state.json"), |
|
|
os.path.join(os.getcwd(), ".sessions", "tw_state.json"), |
|
|
os.path.join(os.getcwd(), "tw_state.json"), |
|
|
] |
|
|
for path in alt_paths: |
|
|
if os.path.exists(path): |
|
|
session_path = path |
|
|
logger.info(f"[TWITTER] Found session at {path}") |
|
|
break |
|
|
|
|
|
if not session_path: |
|
|
return json.dumps( |
|
|
{ |
|
|
"error": "No Twitter session found", |
|
|
"solution": "Run the Twitter session manager to create a session", |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
results = [] |
|
|
|
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch( |
|
|
headless=True, |
|
|
args=[ |
|
|
"--disable-blink-features=AutomationControlled", |
|
|
"--no-sandbox", |
|
|
"--disable-dev-shm-usage", |
|
|
], |
|
|
) |
|
|
|
|
|
context = browser.new_context( |
|
|
storage_state=session_path, |
|
|
viewport={"width": 1280, "height": 720}, |
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
|
|
) |
|
|
|
|
|
context.add_init_script( |
|
|
""" |
|
|
Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); |
|
|
window.chrome = {runtime: {}}; |
|
|
""" |
|
|
) |
|
|
|
|
|
page = context.new_page() |
|
|
|
|
|
|
|
|
search_urls = [ |
|
|
f"https://x.com/search?q={quote_plus(query)}&src=typed_query&f=live", |
|
|
f"https://x.com/search?q={quote_plus(query)}&src=typed_query", |
|
|
f"https://x.com/search?q={quote_plus(query)}", |
|
|
] |
|
|
|
|
|
success = False |
|
|
for url in search_urls: |
|
|
try: |
|
|
logger.info(f"[TWITTER] Trying {url}") |
|
|
page.goto(url, timeout=60000, wait_until="domcontentloaded") |
|
|
time.sleep(5) |
|
|
|
|
|
|
|
|
popup_selectors = [ |
|
|
"[data-testid='app-bar-close']", |
|
|
"[aria-label='Close']", |
|
|
"button:has-text('Not now')", |
|
|
] |
|
|
for selector in popup_selectors: |
|
|
try: |
|
|
if ( |
|
|
page.locator(selector).count() > 0 |
|
|
and page.locator(selector).first.is_visible() |
|
|
): |
|
|
page.locator(selector).first.click() |
|
|
time.sleep(1) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
page.wait_for_selector( |
|
|
"article[data-testid='tweet']", timeout=15000 |
|
|
) |
|
|
logger.info("[TWITTER] Tweets found!") |
|
|
success = True |
|
|
break |
|
|
except: |
|
|
logger.warning("[TWITTER] No tweets found, trying next URL...") |
|
|
continue |
|
|
except Exception as e: |
|
|
logger.error(f"[TWITTER] Navigation failed: {e}") |
|
|
continue |
|
|
|
|
|
if not success or "login" in page.url: |
|
|
logger.error("[TWITTER] Could not load tweets or session expired") |
|
|
return json.dumps( |
|
|
{"error": "Session invalid or tweets not found"}, default=str |
|
|
) |
|
|
|
|
|
|
|
|
seen = set() |
|
|
scroll_attempts = 0 |
|
|
max_scroll_attempts = 15 |
|
|
|
|
|
TWEET_SELECTOR = "article[data-testid='tweet']" |
|
|
TEXT_SELECTOR = "div[data-testid='tweetText']" |
|
|
USER_SELECTOR = "div[data-testid='User-Name']" |
|
|
|
|
|
while len(results) < max_items and scroll_attempts < max_scroll_attempts: |
|
|
scroll_attempts += 1 |
|
|
|
|
|
|
|
|
try: |
|
|
show_more_buttons = page.locator( |
|
|
"[data-testid='tweet-text-show-more-link']" |
|
|
).all() |
|
|
for button in show_more_buttons: |
|
|
if button.is_visible(): |
|
|
try: |
|
|
button.click() |
|
|
time.sleep(0.3) |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
tweets = page.locator(TWEET_SELECTOR).all() |
|
|
new_tweets_found = 0 |
|
|
|
|
|
for tweet in tweets: |
|
|
if len(results) >= max_items: |
|
|
break |
|
|
|
|
|
try: |
|
|
tweet.scroll_into_view_if_needed() |
|
|
time.sleep(0.1) |
|
|
|
|
|
|
|
|
if ( |
|
|
tweet.locator("span:has-text('Promoted')").count() > 0 |
|
|
or tweet.locator("span:has-text('Ad')").count() > 0 |
|
|
): |
|
|
continue |
|
|
|
|
|
|
|
|
text_content = "" |
|
|
text_element = tweet.locator(TEXT_SELECTOR).first |
|
|
if text_element.count() > 0: |
|
|
text_content = text_element.inner_text() |
|
|
|
|
|
cleaned_text = clean_twitter_text(text_content) |
|
|
|
|
|
|
|
|
user_info = "Unknown" |
|
|
user_element = tweet.locator(USER_SELECTOR).first |
|
|
if user_element.count() > 0: |
|
|
user_text = user_element.inner_text() |
|
|
user_info = user_text.split("\n")[0].strip() |
|
|
|
|
|
|
|
|
timestamp = extract_twitter_timestamp(tweet) |
|
|
|
|
|
|
|
|
text_key = cleaned_text[:50] if cleaned_text else "" |
|
|
unique_key = f"{user_info}_{text_key}" |
|
|
|
|
|
if ( |
|
|
cleaned_text |
|
|
and len(cleaned_text) > 20 |
|
|
and unique_key not in seen |
|
|
and not any( |
|
|
word in cleaned_text.lower() |
|
|
for word in ["promoted", "advertisement"] |
|
|
) |
|
|
): |
|
|
|
|
|
seen.add(unique_key) |
|
|
results.append( |
|
|
{ |
|
|
"source": "Twitter", |
|
|
"poster": user_info, |
|
|
"text": cleaned_text, |
|
|
"timestamp": timestamp, |
|
|
"url": "https://x.com", |
|
|
} |
|
|
) |
|
|
new_tweets_found += 1 |
|
|
logger.info( |
|
|
f"[TWITTER] Collected tweet {len(results)}/{max_items}" |
|
|
) |
|
|
|
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
|
|
|
if len(results) < max_items: |
|
|
page.evaluate( |
|
|
"window.scrollTo(0, document.documentElement.scrollHeight)" |
|
|
) |
|
|
time.sleep(random.uniform(2, 3)) |
|
|
|
|
|
if new_tweets_found == 0: |
|
|
scroll_attempts += 1 |
|
|
else: |
|
|
scroll_attempts = 0 |
|
|
|
|
|
browser.close() |
|
|
|
|
|
return json.dumps( |
|
|
{ |
|
|
"source": "Twitter", |
|
|
"query": query, |
|
|
"results": results, |
|
|
"total_found": len(results), |
|
|
"fetched_at": utc_now().isoformat(), |
|
|
}, |
|
|
default=str, |
|
|
indent=2, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[TWITTER] {e}") |
|
|
return json.dumps({"error": str(e)}, default=str) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_linkedin_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"…\s*see more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"See translation", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"\b\d+[dwmo]\s*•\s*(Edited)?\s*•?", "", text) |
|
|
text = re.sub(r".+posted this", "", text) |
|
|
text = re.sub(r"\d+[\.,]?\d*\s*reactions", "", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_fb_text(text): |
|
|
"""Clean Facebook noisy text""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
text = re.sub(r"\b(?:[a-zA-Z]\s+){4,}\b", "", text) |
|
|
text = re.sub(r"(Facebook\s*){2,}", "", text) |
|
|
text = re.sub(r"Like\s*Comment\s*Share", "", text) |
|
|
text = re.sub(r"All reactions:\s*\d+\s*", "", text) |
|
|
text = re.sub(r"\n\d+\n", "\n", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def extract_media_id_instagram(page): |
|
|
"""Extract Instagram media ID""" |
|
|
html = page.content() |
|
|
match = re.search(r'"media_id":"(\d+)"', html) |
|
|
if match: |
|
|
return match.group(1) |
|
|
match = re.search(r'"id":"(\d+_\d+)"', html) |
|
|
if match: |
|
|
return match.group(1) |
|
|
return None |
|
|
|
|
|
|
|
|
def fetch_caption_via_private_api(page, media_id): |
|
|
"""Instagram Private API Caption fetch""" |
|
|
if not media_id: |
|
|
return None |
|
|
|
|
|
api_url = f"https://i.instagram.com/api/v1/media/{media_id}/info/" |
|
|
|
|
|
try: |
|
|
response = page.request.get( |
|
|
api_url, |
|
|
headers={ |
|
|
"User-Agent": ( |
|
|
"Instagram 290.0.0.0.66 (iPhone14,5; iOS 17_0; en_US) " |
|
|
"AppleWebKit/605.1.15" |
|
|
), |
|
|
"X-IG-App-ID": "936619743392459", |
|
|
}, |
|
|
timeout=20000, |
|
|
) |
|
|
if response.status != 200: |
|
|
return None |
|
|
|
|
|
data = response.json() |
|
|
if "items" in data and data["items"]: |
|
|
return data["items"][0].get("caption", {}).get("text") |
|
|
except: |
|
|
pass |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_instagram(keywords: Optional[List[str]] = None, max_items: int = 15): |
|
|
""" |
|
|
Instagram scraper using Playwright session. |
|
|
Scrapes posts from hashtag search and extracts captions. |
|
|
""" |
|
|
ensure_playwright() |
|
|
|
|
|
|
|
|
site = "instagram" |
|
|
session_path = load_playwright_storage_state_path( |
|
|
site, out_dir="src/utils/.sessions" |
|
|
) |
|
|
if not session_path: |
|
|
session_path = load_playwright_storage_state_path(site, out_dir=".sessions") |
|
|
|
|
|
|
|
|
if not session_path: |
|
|
alt_paths = [ |
|
|
os.path.join(os.getcwd(), "src", "utils", ".sessions", "ig_state.json"), |
|
|
os.path.join(os.getcwd(), ".sessions", "ig_state.json"), |
|
|
os.path.join(os.getcwd(), "ig_state.json"), |
|
|
] |
|
|
for path in alt_paths: |
|
|
if os.path.exists(path): |
|
|
session_path = path |
|
|
logger.info(f"[INSTAGRAM] Found session at {path}") |
|
|
break |
|
|
|
|
|
if not session_path: |
|
|
return json.dumps( |
|
|
{ |
|
|
"error": "No Instagram session found", |
|
|
"solution": "Run the Instagram session manager to create a session", |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
keyword = " ".join(keywords) if keywords else "srilanka" |
|
|
keyword = keyword.replace(" ", "") |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
instagram_mobile_ua = ( |
|
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) " |
|
|
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1" |
|
|
) |
|
|
|
|
|
browser = p.chromium.launch(headless=True) |
|
|
|
|
|
context = browser.new_context( |
|
|
storage_state=session_path, |
|
|
user_agent=instagram_mobile_ua, |
|
|
viewport={"width": 430, "height": 932}, |
|
|
) |
|
|
|
|
|
page = context.new_page() |
|
|
url = f"https://www.instagram.com/explore/tags/{keyword}/" |
|
|
|
|
|
logger.info(f"[INSTAGRAM] Navigating to {url}") |
|
|
page.goto(url, timeout=120000) |
|
|
page.wait_for_timeout(4000) |
|
|
|
|
|
|
|
|
for _ in range(12): |
|
|
page.mouse.wheel(0, 2500) |
|
|
page.wait_for_timeout(1500) |
|
|
|
|
|
|
|
|
anchors = page.locator("a[href*='/p/'], a[href*='/reel/']").all() |
|
|
links = [] |
|
|
|
|
|
for a in anchors: |
|
|
href = a.get_attribute("href") |
|
|
if href: |
|
|
full = "https://www.instagram.com" + href |
|
|
links.append(full) |
|
|
if len(links) >= max_items: |
|
|
break |
|
|
|
|
|
logger.info(f"[INSTAGRAM] Found {len(links)} posts") |
|
|
|
|
|
|
|
|
for link in links: |
|
|
logger.info(f"[INSTAGRAM] Scraping {link}") |
|
|
page.goto(link, timeout=120000) |
|
|
page.wait_for_timeout(2000) |
|
|
|
|
|
media_id = extract_media_id_instagram(page) |
|
|
caption = fetch_caption_via_private_api(page, media_id) |
|
|
|
|
|
|
|
|
if not caption: |
|
|
try: |
|
|
caption = ( |
|
|
page.locator("article h1, article span") |
|
|
.first.inner_text() |
|
|
.strip() |
|
|
) |
|
|
except: |
|
|
caption = None |
|
|
|
|
|
if caption: |
|
|
results.append( |
|
|
{ |
|
|
"source": "Instagram", |
|
|
"text": caption, |
|
|
"url": link, |
|
|
"poster": "(Instagram User)", |
|
|
} |
|
|
) |
|
|
logger.info( |
|
|
f"[INSTAGRAM] Collected caption {len(results)}/{max_items}" |
|
|
) |
|
|
|
|
|
browser.close() |
|
|
|
|
|
return json.dumps( |
|
|
{ |
|
|
"site": "Instagram", |
|
|
"results": results, |
|
|
"storage_state": session_path, |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[INSTAGRAM] {e}") |
|
|
return json.dumps({"error": str(e)}, default=str) |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): |
|
|
""" |
|
|
Facebook scraper using Playwright session (Desktop). |
|
|
Extracts posts from keyword search with poster names and text. |
|
|
""" |
|
|
ensure_playwright() |
|
|
|
|
|
|
|
|
site = "facebook" |
|
|
session_path = load_playwright_storage_state_path( |
|
|
site, out_dir="src/utils/.sessions" |
|
|
) |
|
|
if not session_path: |
|
|
session_path = load_playwright_storage_state_path(site, out_dir=".sessions") |
|
|
|
|
|
|
|
|
if not session_path: |
|
|
alt_paths = [ |
|
|
os.path.join(os.getcwd(), "src", "utils", ".sessions", "fb_state.json"), |
|
|
os.path.join(os.getcwd(), ".sessions", "fb_state.json"), |
|
|
os.path.join(os.getcwd(), "fb_state.json"), |
|
|
] |
|
|
for path in alt_paths: |
|
|
if os.path.exists(path): |
|
|
session_path = path |
|
|
logger.info(f"[FACEBOOK] Found session at {path}") |
|
|
break |
|
|
|
|
|
if not session_path: |
|
|
return json.dumps( |
|
|
{ |
|
|
"error": "No Facebook session found", |
|
|
"solution": "Run the Facebook session manager to create a session", |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
keyword = " ".join(keywords) if keywords else "Sri Lanka" |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
facebook_desktop_ua = ( |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " |
|
|
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
|
|
) |
|
|
|
|
|
browser = p.chromium.launch(headless=True) |
|
|
|
|
|
context = browser.new_context( |
|
|
storage_state=session_path, |
|
|
user_agent=facebook_desktop_ua, |
|
|
viewport={"width": 1400, "height": 900}, |
|
|
) |
|
|
|
|
|
page = context.new_page() |
|
|
|
|
|
search_url = f"https://www.facebook.com/search/posts?q={quote(keyword)}" |
|
|
|
|
|
logger.info(f"[FACEBOOK] Navigating to {search_url}") |
|
|
page.goto(search_url, timeout=120000) |
|
|
time.sleep(5) |
|
|
|
|
|
seen = set() |
|
|
stuck = 0 |
|
|
last_scroll = 0 |
|
|
|
|
|
MESSAGE_SELECTOR = "div[data-ad-preview='message']" |
|
|
|
|
|
|
|
|
POSTER_SELECTORS = [ |
|
|
"h3 strong a span", |
|
|
"h3 strong span", |
|
|
"h3 a span", |
|
|
"strong a span", |
|
|
"a[role='link'] span:not([class*='timestamp'])", |
|
|
"span.fwb a", |
|
|
"span.fwb", |
|
|
"a[aria-hidden='false'] span", |
|
|
"a[role='link'] span", |
|
|
] |
|
|
|
|
|
def extract_poster(post): |
|
|
"""Extract poster name from Facebook post""" |
|
|
parent = post.locator( |
|
|
"xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" |
|
|
) |
|
|
|
|
|
for selector in POSTER_SELECTORS: |
|
|
try: |
|
|
el = parent.locator(selector).first |
|
|
if el and el.count() > 0: |
|
|
name = el.inner_text().strip() |
|
|
if name and name != "Facebook" and len(name) > 1: |
|
|
return name |
|
|
except: |
|
|
pass |
|
|
|
|
|
return "(Unknown)" |
|
|
|
|
|
|
|
|
def expand_all_see_more(): |
|
|
"""Click all 'See more' buttons on the visible page""" |
|
|
see_more_selectors = [ |
|
|
|
|
|
"div[role='button'] span:text-is('See more')", |
|
|
"div[role='button']:has-text('See more')", |
|
|
"span:text-is('See more')", |
|
|
"span:text-is('... See more')", |
|
|
"span:text-is('...See more')", |
|
|
|
|
|
"[role='button']:has-text('See more')", |
|
|
"div.x1i10hfl:has-text('See more')", |
|
|
|
|
|
"text='See more'", |
|
|
"text='... See more'", |
|
|
] |
|
|
|
|
|
clicked = 0 |
|
|
for selector in see_more_selectors: |
|
|
try: |
|
|
buttons = page.locator(selector).all() |
|
|
for btn in buttons: |
|
|
try: |
|
|
if btn.is_visible(): |
|
|
btn.scroll_into_view_if_needed() |
|
|
time.sleep(0.2) |
|
|
btn.click(force=True) |
|
|
clicked += 1 |
|
|
time.sleep(0.3) |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
if clicked > 0: |
|
|
logger.info(f"[FACEBOOK] Expanded {clicked} 'See more' buttons") |
|
|
return clicked |
|
|
|
|
|
while len(results) < max_items: |
|
|
|
|
|
expand_all_see_more() |
|
|
time.sleep(0.5) |
|
|
|
|
|
posts = page.locator(MESSAGE_SELECTOR).all() |
|
|
|
|
|
for post in posts: |
|
|
try: |
|
|
|
|
|
try: |
|
|
post.scroll_into_view_if_needed() |
|
|
time.sleep(0.3) |
|
|
|
|
|
|
|
|
parent = post.locator( |
|
|
"xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" |
|
|
) |
|
|
|
|
|
post_see_more_selectors = [ |
|
|
"div[role='button'] span:text-is('See more')", |
|
|
"span:text-is('See more')", |
|
|
"div[role='button']:has-text('See more')", |
|
|
] |
|
|
|
|
|
for selector in post_see_more_selectors: |
|
|
try: |
|
|
btns = parent.locator(selector) |
|
|
if btns.count() > 0 and btns.first.is_visible(): |
|
|
btns.first.click(force=True) |
|
|
time.sleep(0.5) |
|
|
break |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
raw = post.inner_text().strip() |
|
|
cleaned = clean_fb_text(raw) |
|
|
|
|
|
poster = extract_poster(post) |
|
|
|
|
|
if cleaned and len(cleaned) > 30: |
|
|
key = poster + "::" + cleaned |
|
|
if key not in seen: |
|
|
seen.add(key) |
|
|
results.append( |
|
|
{ |
|
|
"source": "Facebook", |
|
|
"poster": poster, |
|
|
"text": cleaned, |
|
|
"url": "https://www.facebook.com", |
|
|
} |
|
|
) |
|
|
logger.info( |
|
|
f"[FACEBOOK] Collected post {len(results)}/{max_items}" |
|
|
) |
|
|
|
|
|
if len(results) >= max_items: |
|
|
break |
|
|
|
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
page.evaluate("window.scrollBy(0, 2300)") |
|
|
time.sleep(1.2) |
|
|
|
|
|
new_scroll = page.evaluate("window.scrollY") |
|
|
stuck = stuck + 1 if new_scroll == last_scroll else 0 |
|
|
last_scroll = new_scroll |
|
|
|
|
|
if stuck >= 3: |
|
|
logger.info("[FACEBOOK] Reached end of results") |
|
|
break |
|
|
|
|
|
browser.close() |
|
|
|
|
|
return json.dumps( |
|
|
{ |
|
|
"site": "Facebook", |
|
|
"results": results[:max_items], |
|
|
"storage_state": session_path, |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[FACEBOOK] {e}") |
|
|
return json.dumps({"error": str(e)}, default=str) |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_government_gazette( |
|
|
keywords: Optional[List[str]] = None, max_items: int = 15 |
|
|
): |
|
|
""" |
|
|
Search and scrape Sri Lankan government gazette entries from gazette.lk. |
|
|
This tool visits each gazette page to extract full descriptions and download links (PDFs). |
|
|
""" |
|
|
data = scrape_government_gazette_impl(keywords=keywords, max_items=max_items) |
|
|
return json.dumps(data, default=str) |
|
|
|
|
|
|
|
|
def clean_linkedin_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"…\s*see more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"See translation", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"\b\d+[dwmo]\s*•\s*(Edited)?\s*•?", "", text) |
|
|
text = re.sub(r".+posted this", "", text) |
|
|
text = re.sub(r"\d+[\.,]?\d*\s*reactions", "", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_parliament_minutes( |
|
|
keywords: Optional[List[str]] = None, max_items: int = 20 |
|
|
): |
|
|
""" |
|
|
Search and scrape Sri Lankan Parliament Hansards and minutes matching keywords. |
|
|
""" |
|
|
data = scrape_parliament_minutes_impl(keywords=keywords, max_items=max_items) |
|
|
return json.dumps(data, default=str) |
|
|
|
|
|
|
|
|
def clean_linkedin_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"…\s*see more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"See translation", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"\b\d+[dwmo]\s*•\s*(Edited)?\s*•?", "", text) |
|
|
text = re.sub(r".+posted this", "", text) |
|
|
text = re.sub(r"\d+[\.,]?\d*\s*reactions", "", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_train_schedule( |
|
|
from_station: Optional[str] = None, |
|
|
to_station: Optional[str] = None, |
|
|
keyword: Optional[str] = None, |
|
|
max_items: int = 30, |
|
|
): |
|
|
""" |
|
|
Scrape Sri Lanka Railways train schedule based on stations or keywords. |
|
|
""" |
|
|
data = scrape_train_schedule_impl( |
|
|
from_station=from_station, |
|
|
to_station=to_station, |
|
|
keyword=keyword, |
|
|
max_items=max_items, |
|
|
) |
|
|
return json.dumps(data, default=str) |
|
|
|
|
|
|
|
|
def clean_linkedin_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"…\s*see more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"See translation", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"\b\d+[dwmo]\s*•\s*(Edited)?\s*•?", "", text) |
|
|
text = re.sub(r".+posted this", "", text) |
|
|
text = re.sub(r"\d+[\.,]?\d*\s*reactions", "", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_cse_stock_data( |
|
|
symbol: str = "ASPI", period: str = "1d", interval: str = "1h" |
|
|
): |
|
|
""" |
|
|
Scrape Colombo Stock Exchange (CSE) data for a given symbol (e.g., ASPI). |
|
|
Tries yfinance first, then falls back to direct site scraping. |
|
|
""" |
|
|
data = scrape_cse_stock_impl(symbol=symbol, period=period, interval=interval) |
|
|
return json.dumps(data, default=str) |
|
|
|
|
|
|
|
|
def clean_linkedin_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"…\s*see more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"See translation", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"\b\d+[dwmo]\s*•\s*(Edited)?\s*•?", "", text) |
|
|
text = re.sub(r".+posted this", "", text) |
|
|
text = re.sub(r"\d+[\.,]?\d*\s*reactions", "", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_local_news(keywords: Optional[List[str]] = None, max_articles: int = 30): |
|
|
""" |
|
|
Scrape major Sri Lankan local news websites (Daily Mirror, Daily FT, etc.) for articles matching keywords. |
|
|
""" |
|
|
data = scrape_local_news_impl(keywords=keywords, max_articles=max_articles) |
|
|
return json.dumps(data, default=str) |
|
|
|
|
|
|
|
|
def clean_linkedin_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r"…\s*see more", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"See translation", "", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"\b\d+[dwmo]\s*•\s*(Edited)?\s*•?", "", text) |
|
|
text = re.sub(r".+posted this", "", text) |
|
|
text = re.sub(r"\d+[\.,]?\d*\s*reactions", "", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
@tool |
|
|
def think_tool(reflection: str) -> str: |
|
|
""" |
|
|
Log a thought or reflection from the agent. Useful for debugging or tracing the agent's reasoning. |
|
|
""" |
|
|
return f"Reflection recorded: {reflection}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_fb_text(text): |
|
|
"""Clean Facebook noisy text""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
text = re.sub(r"\b(?:[a-zA-Z]\s+){4,}\b", "", text) |
|
|
text = re.sub(r"(Facebook\s*){2,}", "", text) |
|
|
text = re.sub(r"Like\s*Comment\s*Share", "", text) |
|
|
text = re.sub(r"All reactions:\s*\d+\s*", "", text) |
|
|
text = re.sub(r"\n\d+\n", "\n", text) |
|
|
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def extract_media_id_instagram(page): |
|
|
"""Extract Instagram media ID""" |
|
|
html = page.content() |
|
|
match = re.search(r'"media_id":"(\d+)"', html) |
|
|
if match: |
|
|
return match.group(1) |
|
|
match = re.search(r'"id":"(\d+_\d+)"', html) |
|
|
if match: |
|
|
return match.group(1) |
|
|
return None |
|
|
|
|
|
|
|
|
def fetch_caption_via_private_api(page, media_id): |
|
|
"""Instagram Private API Caption fetch""" |
|
|
if not media_id: |
|
|
return None |
|
|
|
|
|
api_url = f"https://i.instagram.com/api/v1/media/{media_id}/info/" |
|
|
|
|
|
try: |
|
|
response = page.request.get( |
|
|
api_url, |
|
|
headers={ |
|
|
"User-Agent": ( |
|
|
"Instagram 290.0.0.0.66 (iPhone14,5; iOS 17_0; en_US) " |
|
|
"AppleWebKit/605.1.15" |
|
|
), |
|
|
"X-IG-App-ID": "936619743392459", |
|
|
}, |
|
|
timeout=20000, |
|
|
) |
|
|
if response.status != 200: |
|
|
return None |
|
|
|
|
|
data = response.json() |
|
|
if "items" in data and data["items"]: |
|
|
return data["items"][0].get("caption", {}).get("text") |
|
|
except: |
|
|
pass |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_instagram(keywords: Optional[List[str]] = None, max_items: int = 15): |
|
|
""" |
|
|
Instagram scraper using Playwright session. |
|
|
Scrapes posts from hashtag search and extracts captions. |
|
|
""" |
|
|
ensure_playwright() |
|
|
|
|
|
|
|
|
site = "instagram" |
|
|
session_path = load_playwright_storage_state_path( |
|
|
site, out_dir="src/utils/.sessions" |
|
|
) |
|
|
if not session_path: |
|
|
session_path = load_playwright_storage_state_path(site, out_dir=".sessions") |
|
|
|
|
|
|
|
|
if not session_path: |
|
|
alt_paths = [ |
|
|
os.path.join(os.getcwd(), "src", "utils", ".sessions", "ig_state.json"), |
|
|
os.path.join(os.getcwd(), ".sessions", "ig_state.json"), |
|
|
os.path.join(os.getcwd(), "ig_state.json"), |
|
|
] |
|
|
for path in alt_paths: |
|
|
if os.path.exists(path): |
|
|
session_path = path |
|
|
logger.info(f"[INSTAGRAM] Found session at {path}") |
|
|
break |
|
|
|
|
|
if not session_path: |
|
|
return json.dumps( |
|
|
{ |
|
|
"error": "No Instagram session found", |
|
|
"solution": "Run the Instagram session manager to create a session", |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
keyword = " ".join(keywords) if keywords else "srilanka" |
|
|
keyword = keyword.replace(" ", "") |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
instagram_mobile_ua = ( |
|
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) " |
|
|
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1" |
|
|
) |
|
|
|
|
|
browser = p.chromium.launch(headless=True) |
|
|
|
|
|
context = browser.new_context( |
|
|
storage_state=session_path, |
|
|
user_agent=instagram_mobile_ua, |
|
|
viewport={"width": 430, "height": 932}, |
|
|
) |
|
|
|
|
|
page = context.new_page() |
|
|
url = f"https://www.instagram.com/explore/tags/{keyword}/" |
|
|
|
|
|
logger.info(f"[INSTAGRAM] Navigating to {url}") |
|
|
page.goto(url, timeout=120000) |
|
|
page.wait_for_timeout(4000) |
|
|
|
|
|
|
|
|
for _ in range(12): |
|
|
page.mouse.wheel(0, 2500) |
|
|
page.wait_for_timeout(1500) |
|
|
|
|
|
|
|
|
anchors = page.locator("a[href*='/p/'], a[href*='/reel/']").all() |
|
|
links = [] |
|
|
|
|
|
for a in anchors: |
|
|
href = a.get_attribute("href") |
|
|
if href: |
|
|
full = "https://www.instagram.com" + href |
|
|
links.append(full) |
|
|
if len(links) >= max_items: |
|
|
break |
|
|
|
|
|
logger.info(f"[INSTAGRAM] Found {len(links)} posts") |
|
|
|
|
|
|
|
|
for link in links: |
|
|
logger.info(f"[INSTAGRAM] Scraping {link}") |
|
|
page.goto(link, timeout=120000) |
|
|
page.wait_for_timeout(2000) |
|
|
|
|
|
media_id = extract_media_id_instagram(page) |
|
|
caption = fetch_caption_via_private_api(page, media_id) |
|
|
|
|
|
|
|
|
if not caption: |
|
|
try: |
|
|
caption = ( |
|
|
page.locator("article h1, article span") |
|
|
.first.inner_text() |
|
|
.strip() |
|
|
) |
|
|
except: |
|
|
caption = None |
|
|
|
|
|
if caption: |
|
|
results.append( |
|
|
{ |
|
|
"source": "Instagram", |
|
|
"text": caption, |
|
|
"url": link, |
|
|
"poster": "(Instagram User)", |
|
|
} |
|
|
) |
|
|
logger.info( |
|
|
f"[INSTAGRAM] Collected caption {len(results)}/{max_items}" |
|
|
) |
|
|
|
|
|
browser.close() |
|
|
|
|
|
return json.dumps( |
|
|
{ |
|
|
"site": "Instagram", |
|
|
"results": results, |
|
|
"storage_state": session_path, |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[INSTAGRAM] {e}") |
|
|
return json.dumps({"error": str(e)}, default=str) |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): |
|
|
""" |
|
|
Facebook scraper using Playwright session (Desktop). |
|
|
Extracts posts from keyword search with poster names and text. |
|
|
""" |
|
|
ensure_playwright() |
|
|
|
|
|
|
|
|
site = "facebook" |
|
|
session_path = load_playwright_storage_state_path( |
|
|
site, out_dir="src/utils/.sessions" |
|
|
) |
|
|
if not session_path: |
|
|
session_path = load_playwright_storage_state_path(site, out_dir=".sessions") |
|
|
|
|
|
|
|
|
if not session_path: |
|
|
alt_paths = [ |
|
|
os.path.join(os.getcwd(), "src", "utils", ".sessions", "fb_state.json"), |
|
|
os.path.join(os.getcwd(), ".sessions", "fb_state.json"), |
|
|
os.path.join(os.getcwd(), "fb_state.json"), |
|
|
] |
|
|
for path in alt_paths: |
|
|
if os.path.exists(path): |
|
|
session_path = path |
|
|
logger.info(f"[FACEBOOK] Found session at {path}") |
|
|
break |
|
|
|
|
|
if not session_path: |
|
|
return json.dumps( |
|
|
{ |
|
|
"error": "No Facebook session found", |
|
|
"solution": "Run the Facebook session manager to create a session", |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
keyword = " ".join(keywords) if keywords else "Sri Lanka" |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
with sync_playwright() as p: |
|
|
facebook_desktop_ua = ( |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " |
|
|
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
|
|
) |
|
|
|
|
|
browser = p.chromium.launch(headless=True) |
|
|
|
|
|
context = browser.new_context( |
|
|
storage_state=session_path, |
|
|
user_agent=facebook_desktop_ua, |
|
|
viewport={"width": 1400, "height": 900}, |
|
|
) |
|
|
|
|
|
page = context.new_page() |
|
|
search_url = ( |
|
|
f"https://www.facebook.com/search/posts?q={keyword.replace(' ', '%20')}" |
|
|
) |
|
|
|
|
|
logger.info(f"[FACEBOOK] Navigating to {search_url}") |
|
|
page.goto(search_url, timeout=120000) |
|
|
time.sleep(5) |
|
|
|
|
|
seen = set() |
|
|
stuck = 0 |
|
|
last_scroll = 0 |
|
|
|
|
|
MESSAGE_SELECTOR = "div[data-ad-preview='message']" |
|
|
|
|
|
|
|
|
POSTER_SELECTORS = [ |
|
|
"h3 strong a span", |
|
|
"h3 strong span", |
|
|
"h3 a span", |
|
|
"strong a span", |
|
|
"a[role='link'] span:not([class*='timestamp'])", |
|
|
"span.fwb a", |
|
|
"span.fwb", |
|
|
"a[aria-hidden='false'] span", |
|
|
"a[role='link'] span", |
|
|
] |
|
|
|
|
|
def extract_poster(post): |
|
|
"""Extract poster name from Facebook post""" |
|
|
parent = post.locator( |
|
|
"xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" |
|
|
) |
|
|
|
|
|
for selector in POSTER_SELECTORS: |
|
|
try: |
|
|
el = parent.locator(selector).first |
|
|
if el and el.count() > 0: |
|
|
name = el.inner_text().strip() |
|
|
if name and name != "Facebook" and len(name) > 1: |
|
|
return name |
|
|
except: |
|
|
pass |
|
|
|
|
|
return "(Unknown)" |
|
|
|
|
|
while len(results) < max_items: |
|
|
posts = page.locator(MESSAGE_SELECTOR).all() |
|
|
|
|
|
for post in posts: |
|
|
try: |
|
|
raw = post.inner_text().strip() |
|
|
cleaned = clean_fb_text(raw) |
|
|
|
|
|
poster = extract_poster(post) |
|
|
|
|
|
if cleaned and len(cleaned) > 30: |
|
|
key = poster + "::" + cleaned |
|
|
if key not in seen: |
|
|
seen.add(key) |
|
|
results.append( |
|
|
{ |
|
|
"source": "Facebook", |
|
|
"poster": poster, |
|
|
"text": cleaned, |
|
|
"url": "https://www.facebook.com", |
|
|
} |
|
|
) |
|
|
logger.info( |
|
|
f"[FACEBOOK] Collected post {len(results)}/{max_items}" |
|
|
) |
|
|
|
|
|
if len(results) >= max_items: |
|
|
break |
|
|
|
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
page.evaluate("window.scrollBy(0, 2300)") |
|
|
time.sleep(1.2) |
|
|
|
|
|
new_scroll = page.evaluate("window.scrollY") |
|
|
stuck = stuck + 1 if new_scroll == last_scroll else 0 |
|
|
last_scroll = new_scroll |
|
|
|
|
|
if stuck >= 3: |
|
|
logger.info("[FACEBOOK] Reached end of results") |
|
|
break |
|
|
|
|
|
browser.close() |
|
|
|
|
|
return json.dumps( |
|
|
{ |
|
|
"site": "Facebook", |
|
|
"results": results[:max_items], |
|
|
"storage_state": session_path, |
|
|
}, |
|
|
default=str, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[FACEBOOK] {e}") |
|
|
return json.dumps({"error": str(e)}, default=str) |
|
|
|
|
|
|
|
|
@tool |
|
|
def scrape_reddit( |
|
|
keywords: List[str], limit: int = 20, subreddit: Optional[str] = None |
|
|
): |
|
|
""" |
|
|
Scrape Reddit for posts matching specific keywords. |
|
|
Optionally restrict to a specific subreddit. |
|
|
""" |
|
|
data = scrape_reddit_impl(keywords=keywords, limit=limit, subreddit=subreddit) |
|
|
return json.dumps(data, default=str) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TOOL_MAPPING = { |
|
|
"scrape_linkedin": scrape_linkedin, |
|
|
"scrape_instagram": scrape_instagram, |
|
|
"scrape_facebook": scrape_facebook, |
|
|
"scrape_reddit": scrape_reddit, |
|
|
"scrape_twitter": scrape_twitter, |
|
|
"scrape_government_gazette": scrape_government_gazette, |
|
|
"scrape_parliament_minutes": scrape_parliament_minutes, |
|
|
"scrape_train_schedule": scrape_train_schedule, |
|
|
"scrape_cse_stock_data": scrape_cse_stock_data, |
|
|
"scrape_local_news": scrape_local_news, |
|
|
"think_tool": think_tool, |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
from src.utils.profile_scrapers import ( |
|
|
scrape_twitter_profile, |
|
|
scrape_facebook_profile, |
|
|
scrape_instagram_profile, |
|
|
scrape_linkedin_profile, |
|
|
scrape_product_reviews, |
|
|
) |
|
|
|
|
|
TOOL_MAPPING["scrape_twitter_profile"] = scrape_twitter_profile |
|
|
TOOL_MAPPING["scrape_facebook_profile"] = scrape_facebook_profile |
|
|
TOOL_MAPPING["scrape_instagram_profile"] = scrape_instagram_profile |
|
|
TOOL_MAPPING["scrape_linkedin_profile"] = scrape_linkedin_profile |
|
|
TOOL_MAPPING["scrape_product_reviews"] = scrape_product_reviews |
|
|
print("[OK] Profile scrapers loaded for Intelligence Agent") |
|
|
except ImportError as e: |
|
|
print(f"[WARN] Profile scrapers not available: {e}") |
|
|
|
|
|
|
|
|
ALL_TOOLS = list(TOOL_MAPPING.values()) |
|
|
|
|
|
__all__ = [ |
|
|
"get_today_str", |
|
|
"tool_dmc_alerts", |
|
|
"tool_weather_nowcast", |
|
|
"TOOL_MAPPING", |
|
|
"ALL_TOOLS", |
|
|
"create_or_restore_playwright_session", |
|
|
"playwright_fetch_html_using_session", |
|
|
] |
|
|
|