|
|
""" |
|
|
src/nodes/meteorologicalAgentNode.py |
|
|
MODULAR - Meteorological Agent Node with Subgraph Architecture |
|
|
Three modules: Official Sources, Social Media Collection, Feed Generation |
|
|
|
|
|
Updated: Uses Tool Factory pattern for parallel execution safety. |
|
|
Each agent instance gets its own private set of tools. |
|
|
|
|
|
ENHANCED: Now includes RiverNet flood monitoring integration. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import uuid |
|
|
from typing import Dict, Any |
|
|
from datetime import datetime |
|
|
from src.states.meteorologicalAgentState import MeteorologicalAgentState |
|
|
from src.utils.tool_factory import create_tool_set |
|
|
from src.utils.utils import tool_dmc_alerts, tool_weather_nowcast, tool_rivernet_status |
|
|
from src.llms.groqllm import GroqLLM |
|
|
|
|
|
|
|
|
class MeteorologicalAgentNode: |
|
|
""" |
|
|
Modular Meteorological Agent - Three independent collection modules. |
|
|
Module 1: Official Weather Sources (DMC Alerts, Weather Nowcast, RiverNet) |
|
|
Module 2: Social Media (National, District, Climate) |
|
|
Module 3: Feed Generation (Categorize, Summarize, Format) |
|
|
|
|
|
Thread Safety: |
|
|
Each MeteorologicalAgentNode instance creates its own private ToolSet, |
|
|
enabling safe parallel execution with other agents. |
|
|
""" |
|
|
|
|
|
def __init__(self, llm=None): |
|
|
"""Initialize with Groq LLM and private tool set""" |
|
|
|
|
|
self.tools = create_tool_set() |
|
|
|
|
|
if llm is None: |
|
|
groq = GroqLLM() |
|
|
self.llm = groq.get_llm() |
|
|
else: |
|
|
self.llm = llm |
|
|
|
|
|
|
|
|
self.districts = [ |
|
|
"colombo", |
|
|
"gampaha", |
|
|
"kalutara", |
|
|
"kandy", |
|
|
"matale", |
|
|
"nuwara eliya", |
|
|
"galle", |
|
|
"matara", |
|
|
"hambantota", |
|
|
"jaffna", |
|
|
"kilinochchi", |
|
|
"mannar", |
|
|
"mullaitivu", |
|
|
"vavuniya", |
|
|
"puttalam", |
|
|
"kurunegala", |
|
|
"anuradhapura", |
|
|
"polonnaruwa", |
|
|
"badulla", |
|
|
"monaragala", |
|
|
"ratnapura", |
|
|
"kegalle", |
|
|
"ampara", |
|
|
"batticaloa", |
|
|
"trincomalee", |
|
|
] |
|
|
|
|
|
|
|
|
self.key_districts = ["colombo", "kandy", "galle", "jaffna", "trincomalee"] |
|
|
|
|
|
|
|
|
self.key_cities = [ |
|
|
"Colombo", |
|
|
"Kandy", |
|
|
"Galle", |
|
|
"Jaffna", |
|
|
"Trincomalee", |
|
|
"Anuradhapura", |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_official_sources( |
|
|
self, state: MeteorologicalAgentState |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 1: Collect official weather sources |
|
|
- DMC Alerts (Disaster Management Centre) |
|
|
- Weather Nowcast for key cities |
|
|
- RiverNet flood monitoring data (NEW) |
|
|
""" |
|
|
print("[MODULE 1] Collecting Official Weather Sources") |
|
|
|
|
|
official_results = [] |
|
|
river_data = None |
|
|
|
|
|
|
|
|
try: |
|
|
dmc_data = tool_dmc_alerts() |
|
|
official_results.append( |
|
|
{ |
|
|
"source_tool": "dmc_alerts", |
|
|
"raw_content": json.dumps(dmc_data), |
|
|
"category": "official", |
|
|
"subcategory": "dmc_alerts", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Collected DMC Alerts") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ DMC Alerts error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
river_data = tool_rivernet_status() |
|
|
official_results.append( |
|
|
{ |
|
|
"source_tool": "rivernet", |
|
|
"raw_content": json.dumps(river_data), |
|
|
"category": "official", |
|
|
"subcategory": "flood_monitoring", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
summary = river_data.get("summary", {}) |
|
|
overall_status = summary.get("overall_status", "unknown") |
|
|
river_count = summary.get("total_monitored", 0) |
|
|
print( |
|
|
f" ✓ RiverNet: {river_count} rivers monitored, status: {overall_status}" |
|
|
) |
|
|
|
|
|
|
|
|
for alert in river_data.get("alerts", []): |
|
|
official_results.append( |
|
|
{ |
|
|
"source_tool": "rivernet_alert", |
|
|
"raw_content": json.dumps(alert), |
|
|
"category": "official", |
|
|
"subcategory": "flood_alert", |
|
|
"severity": alert.get("severity", "medium"), |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ⚠️ RiverNet error: {e}") |
|
|
|
|
|
|
|
|
for city in self.key_cities: |
|
|
try: |
|
|
weather_data = tool_weather_nowcast(location=city) |
|
|
official_results.append( |
|
|
{ |
|
|
"source_tool": "weather_nowcast", |
|
|
"raw_content": json.dumps(weather_data), |
|
|
"category": "official", |
|
|
"subcategory": "weather_forecast", |
|
|
"city": city, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(f" ✓ Weather Nowcast for {city}") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Weather Nowcast {city} error: {e}") |
|
|
|
|
|
return { |
|
|
"worker_results": official_results, |
|
|
"latest_worker_results": official_results, |
|
|
"river_data": river_data, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_national_social_media( |
|
|
self, state: MeteorologicalAgentState |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 2A: Collect national-level weather social media |
|
|
""" |
|
|
print("[MODULE 2A] Collecting National Weather Social Media") |
|
|
|
|
|
social_results = [] |
|
|
|
|
|
|
|
|
try: |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{"query": "sri lanka weather forecast rain", "max_items": 15} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "national", |
|
|
"platform": "twitter", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Twitter National Weather") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Twitter error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
facebook_tool = self.tools.get("scrape_facebook") |
|
|
if facebook_tool: |
|
|
facebook_data = facebook_tool.invoke( |
|
|
{ |
|
|
"keywords": ["sri lanka weather", "sri lanka rain"], |
|
|
"max_items": 10, |
|
|
} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_facebook", |
|
|
"raw_content": str(facebook_data), |
|
|
"category": "national", |
|
|
"platform": "facebook", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Facebook National Weather") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Facebook error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
linkedin_tool = self.tools.get("scrape_linkedin") |
|
|
if linkedin_tool: |
|
|
linkedin_data = linkedin_tool.invoke( |
|
|
{ |
|
|
"keywords": ["sri lanka weather", "sri lanka climate"], |
|
|
"max_items": 5, |
|
|
} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_linkedin", |
|
|
"raw_content": str(linkedin_data), |
|
|
"category": "national", |
|
|
"platform": "linkedin", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ LinkedIn Weather/Climate") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ LinkedIn error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
instagram_tool = self.tools.get("scrape_instagram") |
|
|
if instagram_tool: |
|
|
instagram_data = instagram_tool.invoke( |
|
|
{"keywords": ["srilankaweather"], "max_items": 5} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_instagram", |
|
|
"raw_content": str(instagram_data), |
|
|
"category": "national", |
|
|
"platform": "instagram", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Instagram Weather") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Instagram error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
reddit_tool = self.tools.get("scrape_reddit") |
|
|
if reddit_tool: |
|
|
reddit_data = reddit_tool.invoke( |
|
|
{ |
|
|
"keywords": ["sri lanka weather", "sri lanka rain"], |
|
|
"limit": 10, |
|
|
"subreddit": "srilanka", |
|
|
} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_reddit", |
|
|
"raw_content": str(reddit_data), |
|
|
"category": "national", |
|
|
"platform": "reddit", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Reddit Weather") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Reddit error: {e}") |
|
|
|
|
|
return { |
|
|
"worker_results": social_results, |
|
|
"social_media_results": social_results, |
|
|
} |
|
|
|
|
|
def collect_district_social_media( |
|
|
self, state: MeteorologicalAgentState |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 2B: Collect district-level weather social media |
|
|
""" |
|
|
print( |
|
|
f"[MODULE 2B] Collecting District Weather Social Media ({len(self.key_districts)} districts)" |
|
|
) |
|
|
|
|
|
district_results = [] |
|
|
|
|
|
for district in self.key_districts: |
|
|
|
|
|
try: |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{"query": f"{district} sri lanka weather", "max_items": 5} |
|
|
) |
|
|
district_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "district", |
|
|
"district": district, |
|
|
"platform": "twitter", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(f" ✓ Twitter {district.title()}") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Twitter {district} error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
facebook_tool = self.tools.get("scrape_facebook") |
|
|
if facebook_tool: |
|
|
facebook_data = facebook_tool.invoke( |
|
|
{"keywords": [f"{district} weather"], "max_items": 5} |
|
|
) |
|
|
district_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_facebook", |
|
|
"raw_content": str(facebook_data), |
|
|
"category": "district", |
|
|
"district": district, |
|
|
"platform": "facebook", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(f" ✓ Facebook {district.title()}") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Facebook {district} error: {e}") |
|
|
|
|
|
return { |
|
|
"worker_results": district_results, |
|
|
"social_media_results": district_results, |
|
|
} |
|
|
|
|
|
def collect_climate_alerts(self, state: MeteorologicalAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 2C: Collect climate and disaster-related posts |
|
|
""" |
|
|
print("[MODULE 2C] Collecting Climate & Disaster Alerts") |
|
|
|
|
|
climate_results = [] |
|
|
|
|
|
|
|
|
try: |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{ |
|
|
"query": "sri lanka flood drought cyclone disaster", |
|
|
"max_items": 10, |
|
|
} |
|
|
) |
|
|
climate_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "climate", |
|
|
"platform": "twitter", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Twitter Climate Alerts") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Twitter climate error: {e}") |
|
|
|
|
|
return { |
|
|
"worker_results": climate_results, |
|
|
"social_media_results": climate_results, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def categorize_by_geography( |
|
|
self, state: MeteorologicalAgentState |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 3A: Categorize all collected results by geography and alert type |
|
|
""" |
|
|
print("[MODULE 3A] Categorizing Weather Results") |
|
|
|
|
|
all_results = state.get("worker_results", []) or [] |
|
|
|
|
|
|
|
|
official_data = [] |
|
|
national_data = [] |
|
|
alert_data = [] |
|
|
district_data = {district: [] for district in self.districts} |
|
|
|
|
|
for r in all_results: |
|
|
category = r.get("category", "unknown") |
|
|
district = r.get("district") |
|
|
content = r.get("raw_content", "") |
|
|
|
|
|
|
|
|
try: |
|
|
data = json.loads(content) |
|
|
if isinstance(data, dict) and "error" in data: |
|
|
continue |
|
|
|
|
|
if isinstance(data, str): |
|
|
data = json.loads(data) |
|
|
|
|
|
posts = [] |
|
|
if isinstance(data, list): |
|
|
posts = data |
|
|
elif isinstance(data, dict): |
|
|
posts = data.get("results", []) or data.get("data", []) |
|
|
if not posts: |
|
|
posts = [data] |
|
|
|
|
|
|
|
|
if category == "official": |
|
|
official_data.extend(posts[:10]) |
|
|
|
|
|
if r.get("subcategory") == "dmc_alerts": |
|
|
alert_data.extend(posts[:20]) |
|
|
elif category == "climate": |
|
|
alert_data.extend(posts[:10]) |
|
|
elif category == "district" and district: |
|
|
district_data[district].extend(posts[:5]) |
|
|
elif category == "national": |
|
|
national_data.extend(posts[:10]) |
|
|
|
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
|
|
|
structured_feeds = { |
|
|
"sri lanka weather": national_data + official_data, |
|
|
"alerts": alert_data, |
|
|
**{district: posts for district, posts in district_data.items() if posts}, |
|
|
} |
|
|
|
|
|
print( |
|
|
f" ✓ Categorized: {len(official_data)} official, {len(national_data)} national, {len(alert_data)} alerts" |
|
|
) |
|
|
print( |
|
|
f" ✓ Districts with data: {len([d for d in district_data if district_data[d]])}" |
|
|
) |
|
|
|
|
|
return { |
|
|
"structured_output": structured_feeds, |
|
|
"district_feeds": district_data, |
|
|
"national_feed": national_data + official_data, |
|
|
"alert_feed": alert_data, |
|
|
} |
|
|
|
|
|
def generate_llm_summary(self, state: MeteorologicalAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 3B: Use Groq LLM to generate executive summary |
|
|
""" |
|
|
print("[MODULE 3B] Generating LLM Summary") |
|
|
|
|
|
structured_feeds = state.get("structured_output", {}) |
|
|
|
|
|
try: |
|
|
summary_prompt = f"""Analyze the following meteorological intelligence data for Sri Lanka and create a concise executive summary. |
|
|
|
|
|
Data Summary: |
|
|
- National/Official Weather: {len(structured_feeds.get('sri lanka weather', []))} items |
|
|
- Weather Alerts: {len(structured_feeds.get('alerts', []))} items |
|
|
- District Coverage: {len([k for k in structured_feeds.keys() if k not in ['sri lanka weather', 'alerts']])} districts |
|
|
|
|
|
Sample Data: |
|
|
{json.dumps(structured_feeds, indent=2)[:2000]} |
|
|
|
|
|
Generate a brief (3-5 sentences) executive summary highlighting the most important weather developments and alerts.""" |
|
|
|
|
|
llm_response = self.llm.invoke(summary_prompt) |
|
|
llm_summary = ( |
|
|
llm_response.content |
|
|
if hasattr(llm_response, "content") |
|
|
else str(llm_response) |
|
|
) |
|
|
|
|
|
print(" ✓ LLM Summary Generated") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ⚠️ LLM Error: {e}") |
|
|
llm_summary = "AI summary currently unavailable." |
|
|
|
|
|
return {"llm_summary": llm_summary} |
|
|
|
|
|
def format_final_output(self, state: MeteorologicalAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 3C: Format final feed output |
|
|
ENHANCED: Now includes RiverNet flood monitoring data |
|
|
""" |
|
|
print("[MODULE 3C] Formatting Final Output") |
|
|
|
|
|
llm_summary = state.get("llm_summary", "No summary available") |
|
|
structured_feeds = state.get("structured_output", {}) |
|
|
district_feeds = state.get("district_feeds", {}) |
|
|
river_data = state.get("river_data", {}) |
|
|
|
|
|
official_count = len( |
|
|
[ |
|
|
r |
|
|
for r in state.get("worker_results", []) |
|
|
if r.get("category") == "official" |
|
|
] |
|
|
) |
|
|
national_count = len( |
|
|
[ |
|
|
r |
|
|
for r in state.get("worker_results", []) |
|
|
if r.get("category") == "national" |
|
|
] |
|
|
) |
|
|
alert_count = len( |
|
|
[ |
|
|
r |
|
|
for r in state.get("worker_results", []) |
|
|
if r.get("category") == "climate" |
|
|
] |
|
|
) |
|
|
active_districts = len([d for d in district_feeds if district_feeds.get(d)]) |
|
|
|
|
|
|
|
|
river_summary = river_data.get("summary", {}) if river_data else {} |
|
|
rivers_monitored = river_summary.get("total_monitored", 0) |
|
|
river_status = river_summary.get("overall_status", "unknown") |
|
|
has_flood_alerts = river_summary.get("has_alerts", False) |
|
|
|
|
|
change_detected = state.get("change_detected", False) or has_flood_alerts |
|
|
change_line = "⚠️ NEW ALERTS DETECTED\n" if change_detected else "" |
|
|
|
|
|
|
|
|
river_section = "" |
|
|
if river_data and river_data.get("rivers"): |
|
|
river_lines = ["🌊 RIVER MONITORING (RiverNet.lk)"] |
|
|
for river in river_data.get("rivers", [])[:6]: |
|
|
name = river.get("name", "Unknown") |
|
|
status = river.get("status", "unknown") |
|
|
region = river.get("region", "") |
|
|
status_emoji = { |
|
|
"danger": "🔴", |
|
|
"warning": "🟠", |
|
|
"rising": "🟡", |
|
|
"normal": "🟢", |
|
|
"unknown": "⚪", |
|
|
"error": "❌", |
|
|
}.get(status, "⚪") |
|
|
river_lines.append( |
|
|
f" {status_emoji} {name} ({region}): {status.upper()}" |
|
|
) |
|
|
river_section = "\n".join(river_lines) + "\n" |
|
|
|
|
|
bulletin = f"""🇱🇰 COMPREHENSIVE METEOROLOGICAL INTELLIGENCE FEED |
|
|
{datetime.utcnow().strftime("%d %b %Y • %H:%M UTC")} |
|
|
|
|
|
{change_line} |
|
|
📊 EXECUTIVE SUMMARY (AI-Generated) |
|
|
{llm_summary} |
|
|
|
|
|
{river_section} |
|
|
📈 DATA COLLECTION STATS |
|
|
• Official Sources: {official_count} items |
|
|
• National Social Media: {national_count} items |
|
|
• Climate Alerts: {alert_count} items |
|
|
• Active Districts: {active_districts} |
|
|
• Rivers Monitored: {rivers_monitored} (Status: {river_status.upper()}) |
|
|
|
|
|
🔍 COVERAGE |
|
|
Districts monitored: {', '.join([d.title() for d in self.key_districts])} |
|
|
Cities: {', '.join(self.key_cities)} |
|
|
|
|
|
🌐 STRUCTURED DATA AVAILABLE |
|
|
• "sri lanka weather": Combined national & official intelligence |
|
|
• "alerts": Critical weather and disaster alerts |
|
|
• "rivers": Real-time river level monitoring |
|
|
• District-level: {', '.join([d.title() for d in district_feeds if district_feeds.get(d)])} |
|
|
|
|
|
Source: Multi-platform aggregation (DMC, MetDept, RiverNet, Twitter, Facebook, LinkedIn, Instagram, Reddit) |
|
|
""" |
|
|
|
|
|
|
|
|
domain_insights = [] |
|
|
timestamp = datetime.utcnow().isoformat() |
|
|
|
|
|
|
|
|
if river_data and river_data.get("rivers"): |
|
|
for river in river_data.get("rivers", []): |
|
|
status = river.get("status", "unknown") |
|
|
if status in ["danger", "warning", "rising"]: |
|
|
severity = ( |
|
|
"high" |
|
|
if status == "danger" |
|
|
else ("medium" if status == "warning" else "low") |
|
|
) |
|
|
river_name = river.get("name", "Unknown River") |
|
|
region = river.get("region", "") |
|
|
water_level = river.get("water_level", {}) |
|
|
level_str = ( |
|
|
f" at {water_level.get('value', 'N/A')}{water_level.get('unit', 'm')}" |
|
|
if water_level |
|
|
else "" |
|
|
) |
|
|
|
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"domain": "meteorological", |
|
|
"category": "flood_monitoring", |
|
|
"summary": f"🌊 {river_name} ({region}): {status.upper()}{level_str}", |
|
|
"severity": severity, |
|
|
"impact_type": "risk", |
|
|
"source": "rivernet.lk", |
|
|
"river_name": river_name, |
|
|
"river_status": status, |
|
|
"water_level": water_level, |
|
|
"timestamp": timestamp, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if river_summary.get("has_alerts"): |
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"domain": "meteorological", |
|
|
"category": "flood_alert", |
|
|
"summary": f"⚠️ FLOOD MONITORING ALERT: {rivers_monitored} rivers monitored, overall status: {river_status.upper()}", |
|
|
"severity": "high" if river_status == "danger" else "medium", |
|
|
"impact_type": "risk", |
|
|
"source": "rivernet.lk", |
|
|
"river_data": river_data, |
|
|
"timestamp": timestamp, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
alert_data = structured_feeds.get("alerts", []) |
|
|
for alert in alert_data[:10]: |
|
|
alert_text = alert.get("text", "") or alert.get("title", "") |
|
|
if not alert_text: |
|
|
continue |
|
|
detected_district = "Sri Lanka" |
|
|
for district in self.districts: |
|
|
if district.lower() in alert_text.lower(): |
|
|
detected_district = district.title() |
|
|
break |
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"domain": "meteorological", |
|
|
"summary": f"{detected_district}: {alert_text[:200]}", |
|
|
"severity": "high" if change_detected else "medium", |
|
|
"impact_type": "risk", |
|
|
"timestamp": timestamp, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
for district, posts in district_feeds.items(): |
|
|
if not posts: |
|
|
continue |
|
|
for post in posts[:3]: |
|
|
post_text = post.get("text", "") or post.get("title", "") |
|
|
if not post_text or len(post_text) < 10: |
|
|
continue |
|
|
severity = "low" |
|
|
if any( |
|
|
kw in post_text.lower() |
|
|
for kw in [ |
|
|
"flood", |
|
|
"cyclone", |
|
|
"storm", |
|
|
"warning", |
|
|
"alert", |
|
|
"danger", |
|
|
] |
|
|
): |
|
|
severity = "high" |
|
|
elif any(kw in post_text.lower() for kw in ["rain", "wind", "thunder"]): |
|
|
severity = "medium" |
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"domain": "meteorological", |
|
|
"summary": f"{district.title()}: {post_text[:200]}", |
|
|
"severity": severity, |
|
|
"impact_type": "risk" if severity != "low" else "opportunity", |
|
|
"timestamp": timestamp, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
national_data = structured_feeds.get("sri lanka weather", []) |
|
|
for post in national_data[:5]: |
|
|
post_text = post.get("text", "") or post.get("title", "") |
|
|
if not post_text or len(post_text) < 10: |
|
|
continue |
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"domain": "meteorological", |
|
|
"summary": f"Sri Lanka Weather: {post_text[:200]}", |
|
|
"severity": "medium", |
|
|
"impact_type": "risk", |
|
|
"timestamp": timestamp, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"structured_data": structured_feeds, |
|
|
"river_data": river_data, |
|
|
"domain": "meteorological", |
|
|
"summary": f"Sri Lanka Meteorological Summary: {llm_summary[:300]}", |
|
|
"severity": "high" if change_detected else "medium", |
|
|
"impact_type": "risk", |
|
|
} |
|
|
) |
|
|
|
|
|
print( |
|
|
f" ✓ Created {len(domain_insights)} domain insights (including river monitoring)" |
|
|
) |
|
|
|
|
|
return { |
|
|
"final_feed": bulletin, |
|
|
"feed_history": [bulletin], |
|
|
"domain_insights": domain_insights, |
|
|
"river_data": river_data, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def aggregate_and_store_feeds( |
|
|
self, state: MeteorologicalAgentState |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 4: Aggregate, deduplicate, and store feeds |
|
|
- Check uniqueness using Neo4j (URL + content hash) |
|
|
- Store unique posts in Neo4j |
|
|
- Store unique posts in ChromaDB for RAG |
|
|
- Append to CSV dataset for ML training |
|
|
""" |
|
|
print("[MODULE 4] Aggregating and Storing Feeds") |
|
|
|
|
|
from src.utils.db_manager import ( |
|
|
Neo4jManager, |
|
|
ChromaDBManager, |
|
|
extract_post_data, |
|
|
) |
|
|
import csv |
|
|
import os |
|
|
|
|
|
|
|
|
neo4j_manager = Neo4jManager() |
|
|
chroma_manager = ChromaDBManager() |
|
|
|
|
|
|
|
|
all_worker_results = state.get("worker_results", []) |
|
|
|
|
|
|
|
|
total_posts = 0 |
|
|
unique_posts = 0 |
|
|
duplicate_posts = 0 |
|
|
stored_neo4j = 0 |
|
|
stored_chroma = 0 |
|
|
stored_csv = 0 |
|
|
|
|
|
|
|
|
dataset_dir = os.getenv("DATASET_PATH", "./datasets/weather_feeds") |
|
|
os.makedirs(dataset_dir, exist_ok=True) |
|
|
|
|
|
csv_filename = f"weather_feeds_{datetime.now().strftime('%Y%m')}.csv" |
|
|
csv_path = os.path.join(dataset_dir, csv_filename) |
|
|
|
|
|
|
|
|
csv_headers = [ |
|
|
"post_id", |
|
|
"timestamp", |
|
|
"platform", |
|
|
"category", |
|
|
"district", |
|
|
"poster", |
|
|
"post_url", |
|
|
"title", |
|
|
"text", |
|
|
"content_hash", |
|
|
"engagement_score", |
|
|
"engagement_likes", |
|
|
"engagement_shares", |
|
|
"engagement_comments", |
|
|
"source_tool", |
|
|
] |
|
|
|
|
|
|
|
|
file_exists = os.path.exists(csv_path) |
|
|
|
|
|
try: |
|
|
|
|
|
with open(csv_path, "a", newline="", encoding="utf-8") as csvfile: |
|
|
writer = csv.DictWriter(csvfile, fieldnames=csv_headers) |
|
|
|
|
|
|
|
|
if not file_exists: |
|
|
writer.writeheader() |
|
|
print(f" ✓ Created new CSV dataset: {csv_path}") |
|
|
else: |
|
|
print(f" ✓ Appending to existing CSV: {csv_path}") |
|
|
|
|
|
|
|
|
for worker_result in all_worker_results: |
|
|
category = worker_result.get("category", "unknown") |
|
|
platform = worker_result.get("platform", "") or worker_result.get( |
|
|
"subcategory", "" |
|
|
) |
|
|
source_tool = worker_result.get("source_tool", "") |
|
|
district = worker_result.get("district", "") |
|
|
|
|
|
|
|
|
raw_content = worker_result.get("raw_content", "") |
|
|
if not raw_content: |
|
|
continue |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(raw_content, str): |
|
|
data = json.loads(raw_content) |
|
|
else: |
|
|
data = raw_content |
|
|
|
|
|
|
|
|
posts = [] |
|
|
if isinstance(data, list): |
|
|
posts = data |
|
|
elif isinstance(data, dict): |
|
|
|
|
|
posts = ( |
|
|
data.get("results") |
|
|
or data.get("data") |
|
|
or data.get("posts") |
|
|
or data.get("items") |
|
|
or [] |
|
|
) |
|
|
|
|
|
|
|
|
if not posts and ( |
|
|
data.get("title") |
|
|
or data.get("text") |
|
|
or data.get("forecast") |
|
|
): |
|
|
posts = [data] |
|
|
|
|
|
|
|
|
for raw_post in posts: |
|
|
total_posts += 1 |
|
|
|
|
|
|
|
|
if isinstance(raw_post, dict) and "error" in raw_post: |
|
|
continue |
|
|
|
|
|
|
|
|
post_data = extract_post_data( |
|
|
raw_post=raw_post, |
|
|
category=category, |
|
|
platform=platform or "unknown", |
|
|
source_tool=source_tool, |
|
|
) |
|
|
|
|
|
if not post_data: |
|
|
continue |
|
|
|
|
|
|
|
|
if district: |
|
|
post_data["district"] = district |
|
|
|
|
|
|
|
|
is_dup = neo4j_manager.is_duplicate( |
|
|
post_url=post_data["post_url"], |
|
|
content_hash=post_data["content_hash"], |
|
|
) |
|
|
|
|
|
if is_dup: |
|
|
duplicate_posts += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
unique_posts += 1 |
|
|
|
|
|
|
|
|
if neo4j_manager.store_post(post_data): |
|
|
stored_neo4j += 1 |
|
|
|
|
|
|
|
|
if chroma_manager.add_document(post_data): |
|
|
stored_chroma += 1 |
|
|
|
|
|
|
|
|
try: |
|
|
csv_row = { |
|
|
"post_id": post_data["post_id"], |
|
|
"timestamp": post_data["timestamp"], |
|
|
"platform": post_data["platform"], |
|
|
"category": post_data["category"], |
|
|
"district": post_data["district"], |
|
|
"poster": post_data["poster"], |
|
|
"post_url": post_data["post_url"], |
|
|
"title": post_data["title"], |
|
|
"text": post_data["text"], |
|
|
"content_hash": post_data["content_hash"], |
|
|
"engagement_score": post_data["engagement"].get( |
|
|
"score", 0 |
|
|
), |
|
|
"engagement_likes": post_data["engagement"].get( |
|
|
"likes", 0 |
|
|
), |
|
|
"engagement_shares": post_data["engagement"].get( |
|
|
"shares", 0 |
|
|
), |
|
|
"engagement_comments": post_data["engagement"].get( |
|
|
"comments", 0 |
|
|
), |
|
|
"source_tool": post_data["source_tool"], |
|
|
} |
|
|
writer.writerow(csv_row) |
|
|
stored_csv += 1 |
|
|
except Exception as e: |
|
|
print(f" ⚠️ CSV write error: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ⚠️ Error processing worker result: {e}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ⚠️ CSV file error: {e}") |
|
|
|
|
|
|
|
|
neo4j_manager.close() |
|
|
|
|
|
|
|
|
print("\n 📊 AGGREGATION STATISTICS") |
|
|
print(f" Total Posts Processed: {total_posts}") |
|
|
print(f" Unique Posts: {unique_posts}") |
|
|
print(f" Duplicate Posts: {duplicate_posts}") |
|
|
print(f" Stored in Neo4j: {stored_neo4j}") |
|
|
print(f" Stored in ChromaDB: {stored_chroma}") |
|
|
print(f" Stored in CSV: {stored_csv}") |
|
|
print(f" Dataset Path: {csv_path}") |
|
|
|
|
|
|
|
|
neo4j_total = neo4j_manager.get_post_count() if neo4j_manager.driver else 0 |
|
|
chroma_total = ( |
|
|
chroma_manager.get_document_count() if chroma_manager.collection else 0 |
|
|
) |
|
|
|
|
|
print("\n 💾 DATABASE TOTALS") |
|
|
print(f" Neo4j Total Posts: {neo4j_total}") |
|
|
print(f" ChromaDB Total Docs: {chroma_total}") |
|
|
|
|
|
return { |
|
|
"aggregator_stats": { |
|
|
"total_processed": total_posts, |
|
|
"unique_posts": unique_posts, |
|
|
"duplicate_posts": duplicate_posts, |
|
|
"stored_neo4j": stored_neo4j, |
|
|
"stored_chroma": stored_chroma, |
|
|
"stored_csv": stored_csv, |
|
|
"neo4j_total": neo4j_total, |
|
|
"chroma_total": chroma_total, |
|
|
}, |
|
|
"dataset_path": csv_path, |
|
|
} |
|
|
|