|
|
""" |
|
|
src/nodes/socialAgentNode.py |
|
|
MODULAR - Social Agent Node with Subgraph Architecture |
|
|
Monitors trending topics, events, people, social intelligence across geographic scopes |
|
|
|
|
|
Updated: Uses Tool Factory pattern for parallel execution safety. |
|
|
Each agent instance gets its own private set of tools. |
|
|
|
|
|
Updated: Now loads user-defined keywords and profiles from intel config. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import uuid |
|
|
import os |
|
|
from typing import Dict, Any, List |
|
|
from datetime import datetime |
|
|
from src.states.socialAgentState import SocialAgentState |
|
|
from src.utils.tool_factory import create_tool_set |
|
|
from src.llms.groqllm import GroqLLM |
|
|
|
|
|
|
|
|
def load_intel_config() -> dict: |
|
|
"""Load intel config from JSON file (same as main.py).""" |
|
|
config_path = os.path.join( |
|
|
os.path.dirname(__file__), "..", "..", "data", "intel_config.json" |
|
|
) |
|
|
default_config = { |
|
|
"user_profiles": {"twitter": [], "facebook": [], "linkedin": []}, |
|
|
"user_keywords": [], |
|
|
"user_products": [], |
|
|
} |
|
|
try: |
|
|
if os.path.exists(config_path): |
|
|
with open(config_path, "r", encoding="utf-8") as f: |
|
|
return json.load(f) |
|
|
except Exception: |
|
|
pass |
|
|
return default_config |
|
|
|
|
|
|
|
|
class SocialAgentNode: |
|
|
""" |
|
|
Modular Social Agent - Geographic social intelligence collection. |
|
|
Module 1: Trending Topics (Sri Lanka specific trends) |
|
|
Module 2: Social Media (Sri Lanka, Asia, World scopes) |
|
|
Module 3: Feed Generation (Categorize, Summarize, Format) |
|
|
Module 4: User-Defined Keywords & Profiles (from frontend config) |
|
|
|
|
|
Thread Safety: |
|
|
Each SocialAgentNode instance creates its own private ToolSet, |
|
|
enabling safe parallel execution with other agents. |
|
|
""" |
|
|
|
|
|
def __init__(self, llm=None): |
|
|
"""Initialize with Groq LLM and private tool set""" |
|
|
|
|
|
|
|
|
self.tools = create_tool_set() |
|
|
|
|
|
if llm is None: |
|
|
groq = GroqLLM() |
|
|
self.llm = groq.get_llm() |
|
|
else: |
|
|
self.llm = llm |
|
|
|
|
|
|
|
|
self.intel_config = load_intel_config() |
|
|
self.user_keywords = self.intel_config.get("user_keywords", []) |
|
|
self.user_profiles = self.intel_config.get("user_profiles", {}) |
|
|
self.user_products = self.intel_config.get("user_products", []) |
|
|
|
|
|
print( |
|
|
f"[SocialAgent] Loaded {len(self.user_keywords)} user keywords, " |
|
|
f"{sum(len(v) for v in self.user_profiles.values())} profiles" |
|
|
) |
|
|
|
|
|
|
|
|
self.geographic_scopes = { |
|
|
"sri_lanka": ["sri lanka", "colombo", "srilanka"], |
|
|
"asia": [ |
|
|
"india", |
|
|
"pakistan", |
|
|
"bangladesh", |
|
|
"maldives", |
|
|
"singapore", |
|
|
"malaysia", |
|
|
"thailand", |
|
|
], |
|
|
"world": ["global", "international", "breaking news", "world events"], |
|
|
} |
|
|
|
|
|
|
|
|
self.trending_categories = [ |
|
|
"events", |
|
|
"people", |
|
|
"viral", |
|
|
"breaking", |
|
|
"technology", |
|
|
"culture", |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_sri_lanka_trends(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 1: Collect Sri Lankan trending topics |
|
|
""" |
|
|
print("[MODULE 1] Collecting Sri Lankan Trending Topics") |
|
|
|
|
|
trending_results = [] |
|
|
|
|
|
|
|
|
try: |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{"query": "sri lanka trending viral", "max_items": 20} |
|
|
) |
|
|
trending_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "trending", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "twitter", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Twitter Sri Lanka Trends") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Twitter error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
reddit_tool = self.tools.get("scrape_reddit") |
|
|
if reddit_tool: |
|
|
reddit_data = reddit_tool.invoke( |
|
|
{ |
|
|
"keywords": [ |
|
|
"sri lanka trending", |
|
|
"sri lanka viral", |
|
|
"sri lanka news", |
|
|
], |
|
|
"limit": 20, |
|
|
"subreddit": "srilanka", |
|
|
} |
|
|
) |
|
|
trending_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_reddit", |
|
|
"raw_content": str(reddit_data), |
|
|
"category": "trending", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "reddit", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Reddit Sri Lanka Trends") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Reddit error: {e}") |
|
|
|
|
|
return { |
|
|
"worker_results": trending_results, |
|
|
"latest_worker_results": trending_results, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_sri_lanka_social_media(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 2A: Collect Sri Lankan social media across all platforms |
|
|
""" |
|
|
print("[MODULE 2A] Collecting Sri Lankan Social Media") |
|
|
|
|
|
social_results = [] |
|
|
|
|
|
|
|
|
try: |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{"query": "sri lanka events people celebrities", "max_items": 15} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "social", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "twitter", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Twitter Sri Lanka Social") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Twitter error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
facebook_tool = self.tools.get("scrape_facebook") |
|
|
if facebook_tool: |
|
|
facebook_data = facebook_tool.invoke( |
|
|
{ |
|
|
"keywords": ["sri lanka events", "sri lanka trending"], |
|
|
"max_items": 10, |
|
|
} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_facebook", |
|
|
"raw_content": str(facebook_data), |
|
|
"category": "social", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "facebook", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Facebook Sri Lanka Social") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Facebook error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
linkedin_tool = self.tools.get("scrape_linkedin") |
|
|
if linkedin_tool: |
|
|
linkedin_data = linkedin_tool.invoke( |
|
|
{ |
|
|
"keywords": ["sri lanka events", "sri lanka people"], |
|
|
"max_items": 5, |
|
|
} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_linkedin", |
|
|
"raw_content": str(linkedin_data), |
|
|
"category": "social", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "linkedin", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ LinkedIn Sri Lanka Professional") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ LinkedIn error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
instagram_tool = self.tools.get("scrape_instagram") |
|
|
if instagram_tool: |
|
|
instagram_data = instagram_tool.invoke( |
|
|
{"keywords": ["srilankaevents", "srilankatrending"], "max_items": 5} |
|
|
) |
|
|
social_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_instagram", |
|
|
"raw_content": str(instagram_data), |
|
|
"category": "social", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "instagram", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Instagram Sri Lanka") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Instagram error: {e}") |
|
|
|
|
|
return { |
|
|
"worker_results": social_results, |
|
|
"social_media_results": social_results, |
|
|
} |
|
|
|
|
|
def collect_asia_social_media(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 2B: Collect Asian regional social media |
|
|
""" |
|
|
print("[MODULE 2B] Collecting Asian Regional Social Media") |
|
|
|
|
|
asia_results = [] |
|
|
|
|
|
|
|
|
try: |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{ |
|
|
"query": "asia trending india pakistan bangladesh", |
|
|
"max_items": 15, |
|
|
} |
|
|
) |
|
|
asia_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "social", |
|
|
"scope": "asia", |
|
|
"platform": "twitter", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Twitter Asia Trends") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Twitter error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
facebook_tool = self.tools.get("scrape_facebook") |
|
|
if facebook_tool: |
|
|
facebook_data = facebook_tool.invoke( |
|
|
{"keywords": ["asia trending", "india events"], "max_items": 10} |
|
|
) |
|
|
asia_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_facebook", |
|
|
"raw_content": str(facebook_data), |
|
|
"category": "social", |
|
|
"scope": "asia", |
|
|
"platform": "facebook", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Facebook Asia") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Facebook error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
reddit_tool = self.tools.get("scrape_reddit") |
|
|
if reddit_tool: |
|
|
reddit_data = reddit_tool.invoke( |
|
|
{ |
|
|
"keywords": ["asia trending", "india", "pakistan"], |
|
|
"limit": 10, |
|
|
"subreddit": "asia", |
|
|
} |
|
|
) |
|
|
asia_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_reddit", |
|
|
"raw_content": str(reddit_data), |
|
|
"category": "social", |
|
|
"scope": "asia", |
|
|
"platform": "reddit", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Reddit Asia") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Reddit error: {e}") |
|
|
|
|
|
return {"worker_results": asia_results, "social_media_results": asia_results} |
|
|
|
|
|
def collect_world_social_media(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 2C: Collect world/global trending topics |
|
|
""" |
|
|
print("[MODULE 2C] Collecting World Trending Topics") |
|
|
|
|
|
world_results = [] |
|
|
|
|
|
|
|
|
try: |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{"query": "world trending global breaking news", "max_items": 15} |
|
|
) |
|
|
world_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "social", |
|
|
"scope": "world", |
|
|
"platform": "twitter", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Twitter World Trends") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Twitter error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
reddit_tool = self.tools.get("scrape_reddit") |
|
|
if reddit_tool: |
|
|
reddit_data = reddit_tool.invoke( |
|
|
{ |
|
|
"keywords": ["breaking", "trending", "viral"], |
|
|
"limit": 15, |
|
|
"subreddit": "worldnews", |
|
|
} |
|
|
) |
|
|
world_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_reddit", |
|
|
"raw_content": str(reddit_data), |
|
|
"category": "social", |
|
|
"scope": "world", |
|
|
"platform": "reddit", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(" ✓ Reddit World News") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Reddit error: {e}") |
|
|
|
|
|
return {"worker_results": world_results, "social_media_results": world_results} |
|
|
|
|
|
def collect_user_defined_targets(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 2D: Collect data for USER-DEFINED keywords and profiles. |
|
|
These are configured via the frontend Intelligence Settings UI. |
|
|
""" |
|
|
print("[MODULE 2D] Collecting User-Defined Targets") |
|
|
|
|
|
user_results = [] |
|
|
|
|
|
|
|
|
self.intel_config = load_intel_config() |
|
|
self.user_keywords = self.intel_config.get("user_keywords", []) |
|
|
self.user_profiles = self.intel_config.get("user_profiles", {}) |
|
|
self.user_products = self.intel_config.get("user_products", []) |
|
|
|
|
|
|
|
|
if not self.user_keywords and not any(self.user_profiles.values()): |
|
|
print(" ⏭️ No user-defined targets configured") |
|
|
return {"worker_results": [], "user_target_results": []} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.user_keywords: |
|
|
print(f" 📝 Scraping {len(self.user_keywords)} user keywords...") |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
|
|
|
for keyword in self.user_keywords[:10]: |
|
|
try: |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{"query": keyword, "max_items": 5} |
|
|
) |
|
|
user_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "user_keyword", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "twitter", |
|
|
"keyword": keyword, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(f" ✓ Keyword: '{keyword}'") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Keyword '{keyword}' error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.user_products: |
|
|
print(f" 📦 Scraping {len(self.user_products)} user products...") |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
|
|
|
for product in self.user_products[:5]: |
|
|
try: |
|
|
if twitter_tool: |
|
|
twitter_data = twitter_tool.invoke( |
|
|
{ |
|
|
"query": f"{product} review OR {product} Sri Lanka", |
|
|
"max_items": 3, |
|
|
} |
|
|
) |
|
|
user_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "user_product", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "twitter", |
|
|
"product": product, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(f" ✓ Product: '{product}'") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Product '{product}' error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
twitter_profiles = self.user_profiles.get("twitter", []) |
|
|
if twitter_profiles: |
|
|
print(f" 👤 Scraping {len(twitter_profiles)} Twitter profiles...") |
|
|
twitter_tool = self.tools.get("scrape_twitter") |
|
|
|
|
|
for profile in twitter_profiles[:10]: |
|
|
try: |
|
|
|
|
|
handle = profile.replace("@", "").strip() |
|
|
if twitter_tool: |
|
|
|
|
|
twitter_data = twitter_tool.invoke( |
|
|
{"query": f"from:{handle} OR @{handle}", "max_items": 5} |
|
|
) |
|
|
user_results.append( |
|
|
{ |
|
|
"source_tool": "scrape_twitter", |
|
|
"raw_content": str(twitter_data), |
|
|
"category": "user_profile", |
|
|
"scope": "sri_lanka", |
|
|
"platform": "twitter", |
|
|
"profile": f"@{handle}", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
print(f" ✓ Profile: @{handle}") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Profile @{profile} error: {e}") |
|
|
|
|
|
print(f" ✅ User targets: {len(user_results)} results collected") |
|
|
return {"worker_results": user_results, "user_target_results": user_results} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def categorize_by_geography(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 3A: Categorize all collected results by geographic scope |
|
|
""" |
|
|
print("[MODULE 3A] Categorizing Results by Geography") |
|
|
|
|
|
all_results = state.get("worker_results", []) or [] |
|
|
|
|
|
|
|
|
sri_lanka_data = [] |
|
|
asia_data = [] |
|
|
world_data = [] |
|
|
geographic_data = {"sri_lanka": [], "asia": [], "world": []} |
|
|
|
|
|
for r in all_results: |
|
|
scope = r.get("scope", "unknown") |
|
|
content = r.get("raw_content", "") |
|
|
|
|
|
|
|
|
try: |
|
|
data = json.loads(content) |
|
|
if isinstance(data, dict) and "error" in data: |
|
|
continue |
|
|
|
|
|
if isinstance(data, str): |
|
|
data = json.loads(data) |
|
|
|
|
|
posts = [] |
|
|
if isinstance(data, list): |
|
|
posts = data |
|
|
elif isinstance(data, dict): |
|
|
posts = data.get("results", []) or data.get("data", []) |
|
|
if not posts: |
|
|
posts = [data] |
|
|
|
|
|
|
|
|
if scope == "sri_lanka": |
|
|
sri_lanka_data.extend(posts[:10]) |
|
|
geographic_data["sri_lanka"].extend(posts[:10]) |
|
|
elif scope == "asia": |
|
|
asia_data.extend(posts[:10]) |
|
|
geographic_data["asia"].extend(posts[:10]) |
|
|
elif scope == "world": |
|
|
world_data.extend(posts[:10]) |
|
|
geographic_data["world"].extend(posts[:10]) |
|
|
|
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
|
|
|
structured_feeds = { |
|
|
"sri lanka": sri_lanka_data, |
|
|
"asia": asia_data, |
|
|
"world": world_data, |
|
|
} |
|
|
|
|
|
print( |
|
|
f" ✓ Categorized: {len(sri_lanka_data)} Sri Lanka, {len(asia_data)} Asia, {len(world_data)} World" |
|
|
) |
|
|
|
|
|
return { |
|
|
"structured_output": structured_feeds, |
|
|
"geographic_feeds": geographic_data, |
|
|
"sri_lanka_feed": sri_lanka_data, |
|
|
"asia_feed": asia_data, |
|
|
"world_feed": world_data, |
|
|
} |
|
|
|
|
|
def generate_llm_summary(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 3B: Use Groq LLM to generate executive summary AND structured insights |
|
|
""" |
|
|
print("[MODULE 3B] Generating LLM Summary + Structured Insights") |
|
|
|
|
|
structured_feeds = state.get("structured_output", {}) |
|
|
llm_summary = "AI summary currently unavailable." |
|
|
llm_insights = [] |
|
|
|
|
|
try: |
|
|
|
|
|
all_posts = [] |
|
|
for region, posts in structured_feeds.items(): |
|
|
for p in posts[:5]: |
|
|
text = p.get("text", "") or p.get("title", "") |
|
|
if text and len(text) > 20: |
|
|
all_posts.append(f"[{region.upper()}] {text[:200]}") |
|
|
|
|
|
if not all_posts: |
|
|
return {"llm_summary": llm_summary, "llm_insights": []} |
|
|
|
|
|
posts_text = "\n".join(all_posts[:15]) |
|
|
|
|
|
|
|
|
analysis_prompt = f"""Analyze these social media posts from Sri Lanka and the region. Generate: |
|
|
1. A 3-sentence executive summary of key trends |
|
|
2. Up to 5 unique intelligence insights |
|
|
|
|
|
Posts: |
|
|
{posts_text} |
|
|
|
|
|
Respond in this exact JSON format: |
|
|
{{ |
|
|
"executive_summary": "Brief 3-sentence summary of key social trends and developments", |
|
|
"insights": [ |
|
|
{{"summary": "Unique insight #1 (not copying post text)", "severity": "low/medium/high", "impact_type": "risk/opportunity"}}, |
|
|
{{"summary": "Unique insight #2", "severity": "low/medium/high", "impact_type": "risk/opportunity"}} |
|
|
] |
|
|
}} |
|
|
|
|
|
Rules: |
|
|
- Generate NEW insights, don't just copy post text |
|
|
- Identify patterns and emerging trends |
|
|
- Classify severity based on potential impact |
|
|
- Mark positive developments as "opportunity", concerning ones as "risk" |
|
|
|
|
|
JSON only, no explanation:""" |
|
|
|
|
|
llm_response = self.llm.invoke(analysis_prompt) |
|
|
content = ( |
|
|
llm_response.content |
|
|
if hasattr(llm_response, "content") |
|
|
else str(llm_response) |
|
|
) |
|
|
|
|
|
|
|
|
import re |
|
|
|
|
|
content = content.strip() |
|
|
if content.startswith("```"): |
|
|
content = re.sub(r"^```\w*\n?", "", content) |
|
|
content = re.sub(r"\n?```$", "", content) |
|
|
|
|
|
result = json.loads(content) |
|
|
llm_summary = result.get("executive_summary", llm_summary) |
|
|
llm_insights = result.get("insights", []) |
|
|
|
|
|
print(f" ✓ LLM generated {len(llm_insights)} unique insights") |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
print(f" ⚠️ JSON parse error: {e}") |
|
|
|
|
|
try: |
|
|
fallback_prompt = f"Summarize these social media trends in 3 sentences:\n{posts_text[:1500]}" |
|
|
response = self.llm.invoke(fallback_prompt) |
|
|
llm_summary = ( |
|
|
response.content if hasattr(response, "content") else str(response) |
|
|
) |
|
|
except Exception as fallback_error: |
|
|
print(f" ⚠️ LLM fallback also failed: {fallback_error}") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ LLM Error: {e}") |
|
|
|
|
|
return {"llm_summary": llm_summary, "llm_insights": llm_insights} |
|
|
|
|
|
def format_final_output(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 3C: Format final feed output with LLM-enhanced insights |
|
|
""" |
|
|
print("[MODULE 3C] Formatting Final Output") |
|
|
|
|
|
llm_summary = state.get("llm_summary", "No summary available") |
|
|
llm_insights = state.get("llm_insights", []) |
|
|
structured_feeds = state.get("structured_output", {}) |
|
|
|
|
|
trending_count = len( |
|
|
[ |
|
|
r |
|
|
for r in state.get("worker_results", []) |
|
|
if r.get("category") == "trending" |
|
|
] |
|
|
) |
|
|
social_count = len( |
|
|
[ |
|
|
r |
|
|
for r in state.get("worker_results", []) |
|
|
if r.get("category") == "social" |
|
|
] |
|
|
) |
|
|
|
|
|
sri_lanka_items = len(structured_feeds.get("sri lanka", [])) |
|
|
asia_items = len(structured_feeds.get("asia", [])) |
|
|
world_items = len(structured_feeds.get("world", [])) |
|
|
|
|
|
bulletin = f"""🌏 COMPREHENSIVE SOCIAL INTELLIGENCE FEED |
|
|
{datetime.utcnow().strftime("%d %b %Y • %H:%M UTC")} |
|
|
|
|
|
📊 EXECUTIVE SUMMARY (AI-Generated) |
|
|
{llm_summary} |
|
|
|
|
|
📈 DATA COLLECTION STATS |
|
|
• Trending Topics: {trending_count} items |
|
|
• Social Media Posts: {social_count} items |
|
|
• Geographic Coverage: Sri Lanka, Asia, World |
|
|
|
|
|
🔍 GEOGRAPHIC BREAKDOWN |
|
|
• Sri Lanka: {sri_lanka_items} trending items |
|
|
• Asia: {asia_items} regional items |
|
|
• World: {world_items} global items |
|
|
|
|
|
🌐 COVERAGE CATEGORIES |
|
|
• Events: Public gatherings, launches, announcements |
|
|
• People: Influencers, celebrities, public figures |
|
|
• Viral Content: Trending posts, hashtags, memes |
|
|
• Breaking: Real-time developments |
|
|
|
|
|
🎯 INTELLIGENCE FOCUS |
|
|
Monitoring social sentiment, trending topics, events, and people across: |
|
|
- Sri Lanka (local intelligence) |
|
|
- Asia (regional context: India, Pakistan, Bangladesh, ASEAN) |
|
|
- World (global trends affecting local sentiment) |
|
|
|
|
|
Source: Multi-platform aggregation (Twitter, Facebook, LinkedIn, Instagram, Reddit) |
|
|
""" |
|
|
|
|
|
|
|
|
domain_insights = [] |
|
|
timestamp = datetime.utcnow().isoformat() |
|
|
|
|
|
|
|
|
for insight in llm_insights: |
|
|
if isinstance(insight, dict) and insight.get("summary"): |
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"domain": "social", |
|
|
"summary": f"🔍 {insight.get('summary', '')}", |
|
|
"severity": insight.get("severity", "medium"), |
|
|
"impact_type": insight.get("impact_type", "risk"), |
|
|
"timestamp": timestamp, |
|
|
"is_llm_generated": True, |
|
|
} |
|
|
) |
|
|
|
|
|
print(f" ✓ Added {len(llm_insights)} LLM-generated insights") |
|
|
|
|
|
|
|
|
|
|
|
if len(domain_insights) < 5: |
|
|
|
|
|
districts = [ |
|
|
"colombo", |
|
|
"gampaha", |
|
|
"kalutara", |
|
|
"kandy", |
|
|
"matale", |
|
|
"nuwara eliya", |
|
|
"galle", |
|
|
"matara", |
|
|
"hambantota", |
|
|
"jaffna", |
|
|
"kilinochchi", |
|
|
"mannar", |
|
|
"mullaitivu", |
|
|
"vavuniya", |
|
|
"puttalam", |
|
|
"kurunegala", |
|
|
"anuradhapura", |
|
|
"polonnaruwa", |
|
|
"badulla", |
|
|
"monaragala", |
|
|
"ratnapura", |
|
|
"kegalle", |
|
|
"ampara", |
|
|
"batticaloa", |
|
|
"trincomalee", |
|
|
] |
|
|
|
|
|
|
|
|
sri_lanka_data = structured_feeds.get("sri lanka", []) |
|
|
for post in sri_lanka_data[:5]: |
|
|
post_text = post.get("text", "") or post.get("title", "") |
|
|
if not post_text or len(post_text) < 20: |
|
|
continue |
|
|
|
|
|
|
|
|
detected_district = "Sri Lanka" |
|
|
for district in districts: |
|
|
if district.lower() in post_text.lower(): |
|
|
detected_district = district.title() |
|
|
break |
|
|
|
|
|
|
|
|
severity = "low" |
|
|
if any( |
|
|
kw in post_text.lower() |
|
|
for kw in ["protest", "riot", "emergency", "violence", "crisis"] |
|
|
): |
|
|
severity = "high" |
|
|
elif any( |
|
|
kw in post_text.lower() |
|
|
for kw in ["trending", "viral", "breaking", "update"] |
|
|
): |
|
|
severity = "medium" |
|
|
|
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"domain": "social", |
|
|
"summary": f"{detected_district}: {post_text[:200]}", |
|
|
"severity": severity, |
|
|
"impact_type": ( |
|
|
"risk" if severity in ["high", "medium"] else "opportunity" |
|
|
), |
|
|
"timestamp": timestamp, |
|
|
"is_llm_generated": False, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
domain_insights.append( |
|
|
{ |
|
|
"source_event_id": str(uuid.uuid4()), |
|
|
"structured_data": structured_feeds, |
|
|
"domain": "social", |
|
|
"summary": f"📊 Social Intelligence Summary: {llm_summary[:300]}", |
|
|
"severity": "medium", |
|
|
"impact_type": "risk", |
|
|
"is_llm_generated": True, |
|
|
} |
|
|
) |
|
|
|
|
|
print(f" ✓ Created {len(domain_insights)} total social intelligence insights") |
|
|
|
|
|
return { |
|
|
"final_feed": bulletin, |
|
|
"feed_history": [bulletin], |
|
|
"domain_insights": domain_insights, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def aggregate_and_store_feeds(self, state: SocialAgentState) -> Dict[str, Any]: |
|
|
""" |
|
|
Module 4: Aggregate, deduplicate, and store feeds |
|
|
- Check uniqueness using Neo4j (URL + content hash) |
|
|
- Store unique posts in Neo4j |
|
|
- Store unique posts in ChromaDB for RAG |
|
|
- Append to CSV dataset for ML training |
|
|
""" |
|
|
print("[MODULE 4] Aggregating and Storing Feeds") |
|
|
|
|
|
from src.utils.db_manager import ( |
|
|
Neo4jManager, |
|
|
ChromaDBManager, |
|
|
extract_post_data, |
|
|
) |
|
|
import csv |
|
|
import os |
|
|
|
|
|
|
|
|
neo4j_manager = Neo4jManager() |
|
|
chroma_manager = ChromaDBManager() |
|
|
|
|
|
|
|
|
all_worker_results = state.get("worker_results", []) |
|
|
|
|
|
|
|
|
total_posts = 0 |
|
|
unique_posts = 0 |
|
|
duplicate_posts = 0 |
|
|
stored_neo4j = 0 |
|
|
stored_chroma = 0 |
|
|
stored_csv = 0 |
|
|
|
|
|
|
|
|
dataset_dir = os.getenv("DATASET_PATH", "./datasets/social_feeds") |
|
|
os.makedirs(dataset_dir, exist_ok=True) |
|
|
|
|
|
csv_filename = f"social_feeds_{datetime.now().strftime('%Y%m')}.csv" |
|
|
csv_path = os.path.join(dataset_dir, csv_filename) |
|
|
|
|
|
|
|
|
csv_headers = [ |
|
|
"post_id", |
|
|
"timestamp", |
|
|
"platform", |
|
|
"category", |
|
|
"scope", |
|
|
"poster", |
|
|
"post_url", |
|
|
"title", |
|
|
"text", |
|
|
"content_hash", |
|
|
"engagement_score", |
|
|
"engagement_likes", |
|
|
"engagement_shares", |
|
|
"engagement_comments", |
|
|
"source_tool", |
|
|
] |
|
|
|
|
|
|
|
|
file_exists = os.path.exists(csv_path) |
|
|
|
|
|
try: |
|
|
|
|
|
with open(csv_path, "a", newline="", encoding="utf-8") as csvfile: |
|
|
writer = csv.DictWriter(csvfile, fieldnames=csv_headers) |
|
|
|
|
|
|
|
|
if not file_exists: |
|
|
writer.writeheader() |
|
|
print(f" ✓ Created new CSV dataset: {csv_path}") |
|
|
else: |
|
|
print(f" ✓ Appending to existing CSV: {csv_path}") |
|
|
|
|
|
|
|
|
for worker_result in all_worker_results: |
|
|
category = worker_result.get("category", "unknown") |
|
|
platform = worker_result.get("platform", "unknown") |
|
|
source_tool = worker_result.get("source_tool", "") |
|
|
scope = worker_result.get("scope", "") |
|
|
|
|
|
|
|
|
raw_content = worker_result.get("raw_content", "") |
|
|
if not raw_content: |
|
|
continue |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(raw_content, str): |
|
|
data = json.loads(raw_content) |
|
|
else: |
|
|
data = raw_content |
|
|
|
|
|
|
|
|
posts = [] |
|
|
if isinstance(data, list): |
|
|
posts = data |
|
|
elif isinstance(data, dict): |
|
|
|
|
|
posts = ( |
|
|
data.get("results") |
|
|
or data.get("data") |
|
|
or data.get("posts") |
|
|
or data.get("items") |
|
|
or [] |
|
|
) |
|
|
|
|
|
|
|
|
if not posts and (data.get("title") or data.get("text")): |
|
|
posts = [data] |
|
|
|
|
|
|
|
|
for raw_post in posts: |
|
|
total_posts += 1 |
|
|
|
|
|
|
|
|
if isinstance(raw_post, dict) and "error" in raw_post: |
|
|
continue |
|
|
|
|
|
|
|
|
post_data = extract_post_data( |
|
|
raw_post=raw_post, |
|
|
category=category, |
|
|
platform=platform, |
|
|
source_tool=source_tool, |
|
|
) |
|
|
|
|
|
if not post_data: |
|
|
continue |
|
|
|
|
|
|
|
|
is_dup = neo4j_manager.is_duplicate( |
|
|
post_url=post_data["post_url"], |
|
|
content_hash=post_data["content_hash"], |
|
|
) |
|
|
|
|
|
if is_dup: |
|
|
duplicate_posts += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
unique_posts += 1 |
|
|
|
|
|
|
|
|
if neo4j_manager.store_post(post_data): |
|
|
stored_neo4j += 1 |
|
|
|
|
|
|
|
|
if chroma_manager.add_document(post_data): |
|
|
stored_chroma += 1 |
|
|
|
|
|
|
|
|
try: |
|
|
csv_row = { |
|
|
"post_id": post_data["post_id"], |
|
|
"timestamp": post_data["timestamp"], |
|
|
"platform": post_data["platform"], |
|
|
"category": post_data["category"], |
|
|
"scope": scope, |
|
|
"poster": post_data["poster"], |
|
|
"post_url": post_data["post_url"], |
|
|
"title": post_data["title"], |
|
|
"text": post_data["text"], |
|
|
"content_hash": post_data["content_hash"], |
|
|
"engagement_score": post_data["engagement"].get( |
|
|
"score", 0 |
|
|
), |
|
|
"engagement_likes": post_data["engagement"].get( |
|
|
"likes", 0 |
|
|
), |
|
|
"engagement_shares": post_data["engagement"].get( |
|
|
"shares", 0 |
|
|
), |
|
|
"engagement_comments": post_data["engagement"].get( |
|
|
"comments", 0 |
|
|
), |
|
|
"source_tool": post_data["source_tool"], |
|
|
} |
|
|
writer.writerow(csv_row) |
|
|
stored_csv += 1 |
|
|
except Exception as e: |
|
|
print(f" ⚠️ CSV write error: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ⚠️ Error processing worker result: {e}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ⚠️ CSV file error: {e}") |
|
|
|
|
|
|
|
|
neo4j_manager.close() |
|
|
|
|
|
|
|
|
print("\n 📊 AGGREGATION STATISTICS") |
|
|
print(f" Total Posts Processed: {total_posts}") |
|
|
print(f" Unique Posts: {unique_posts}") |
|
|
print(f" Duplicate Posts: {duplicate_posts}") |
|
|
print(f" Stored in Neo4j: {stored_neo4j}") |
|
|
print(f" Stored in ChromaDB: {stored_chroma}") |
|
|
print(f" Stored in CSV: {stored_csv}") |
|
|
print(f" Dataset Path: {csv_path}") |
|
|
|
|
|
|
|
|
neo4j_total = neo4j_manager.get_post_count() if neo4j_manager.driver else 0 |
|
|
chroma_total = ( |
|
|
chroma_manager.get_document_count() if chroma_manager.collection else 0 |
|
|
) |
|
|
|
|
|
print("\n 💾 DATABASE TOTALS") |
|
|
print(f" Neo4j Total Posts: {neo4j_total}") |
|
|
print(f" ChromaDB Total Docs: {chroma_total}") |
|
|
|
|
|
return { |
|
|
"aggregator_stats": { |
|
|
"total_processed": total_posts, |
|
|
"unique_posts": unique_posts, |
|
|
"duplicate_posts": duplicate_posts, |
|
|
"stored_neo4j": stored_neo4j, |
|
|
"stored_chroma": stored_chroma, |
|
|
"stored_csv": stored_csv, |
|
|
"neo4j_total": neo4j_total, |
|
|
"chroma_total": chroma_total, |
|
|
}, |
|
|
"dataset_path": csv_path, |
|
|
} |
|
|
|