""" combinedAgentGraph.py - Main entry point for the Combined Agent System. """ from __future__ import annotations from typing import Dict, Any import logging from datetime import datetime from langgraph.graph import StateGraph, START, END from src.llms.groqllm import GroqLLM from src.states.combinedAgentState import CombinedAgentState from src.nodes.combinedAgentNode import CombinedAgentNode try: from src.config.langsmith_config import LangSmithConfig _langsmith = LangSmithConfig() _langsmith.configure() except ImportError: pass from src.graphs.socialAgentGraph import SocialGraphBuilder from src.graphs.intelligenceAgentGraph import IntelligenceGraphBuilder from src.graphs.economicalAgentGraph import EconomicalGraphBuilder from src.graphs.politicalAgentGraph import PoliticalGraphBuilder from src.graphs.meteorologicalAgentGraph import MeteorologicalGraphBuilder logger = logging.getLogger("main_graph") logger.setLevel(logging.INFO) if not logger.handlers: ch = logging.StreamHandler() ch.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) logger.addHandler(ch) class CombinedAgentGraphBuilder: def __init__(self, llm): self.llm = llm def build_graph(self): social_graph = SocialGraphBuilder(self.llm).build_graph() intelligence_graph = IntelligenceGraphBuilder(self.llm).build_graph() economical_graph = EconomicalGraphBuilder(self.llm).build_graph() political_graph = PoliticalGraphBuilder(self.llm).build_graph() meteorological_graph = MeteorologicalGraphBuilder(self.llm).build_graph() def run_social_agent(state: CombinedAgentState) -> Dict[str, Any]: logger.info("[CombinedGraph] Invoking SocialAgent...") try: result = social_graph.invoke({}) insights = result.get("domain_insights", []) logger.info( f"[CombinedGraph] SocialAgent returned {len(insights)} insights" ) return {"domain_insights": insights} except Exception as e: logger.error(f"[CombinedGraph] SocialAgent FAILED: {e}") return {"domain_insights": []} def run_intelligence_agent(state: CombinedAgentState) -> Dict[str, Any]: logger.info("[CombinedGraph] Invoking IntelligenceAgent...") try: result = intelligence_graph.invoke({}) insights = result.get("domain_insights", []) logger.info( f"[CombinedGraph] IntelligenceAgent returned {len(insights)} insights" ) return {"domain_insights": insights} except Exception as e: logger.error(f"[CombinedGraph] IntelligenceAgent FAILED: {e}") return {"domain_insights": []} def run_economical_agent(state: CombinedAgentState) -> Dict[str, Any]: logger.info("[CombinedGraph] Invoking EconomicalAgent...") try: result = economical_graph.invoke({}) insights = result.get("domain_insights", []) logger.info( f"[CombinedGraph] EconomicalAgent returned {len(insights)} insights" ) return {"domain_insights": insights} except Exception as e: logger.error(f"[CombinedGraph] EconomicalAgent FAILED: {e}") return {"domain_insights": []} def run_political_agent(state: CombinedAgentState) -> Dict[str, Any]: logger.info("[CombinedGraph] Invoking PoliticalAgent...") try: result = political_graph.invoke({}) insights = result.get("domain_insights", []) logger.info( f"[CombinedGraph] PoliticalAgent returned {len(insights)} insights" ) return {"domain_insights": insights} except Exception as e: logger.error(f"[CombinedGraph] PoliticalAgent FAILED: {e}") return {"domain_insights": []} def run_meteorological_agent(state: CombinedAgentState) -> Dict[str, Any]: logger.info("[CombinedGraph] Invoking MeteorologicalAgent...") try: result = meteorological_graph.invoke({}) insights = result.get("domain_insights", []) logger.info( f"[CombinedGraph] MeteorologicalAgent returned {len(insights)} insights" ) return {"domain_insights": insights} except Exception as e: logger.error(f"[CombinedGraph] MeteorologicalAgent FAILED: {e}") return {"domain_insights": []} orchestrator = CombinedAgentNode(self.llm) workflow = StateGraph(CombinedAgentState) workflow.add_node("SocialAgent", run_social_agent) workflow.add_node("IntelligenceAgent", run_intelligence_agent) workflow.add_node("EconomicalAgent", run_economical_agent) workflow.add_node("PoliticalAgent", run_political_agent) workflow.add_node("MeteorologicalAgent", run_meteorological_agent) workflow.add_node("GraphInitiator", orchestrator.graph_initiator) workflow.add_node("FeedAggregatorAgent", orchestrator.feed_aggregator_agent) workflow.add_node("DataRefresherAgent", orchestrator.data_refresher_agent) workflow.add_node("DataRefreshRouter", orchestrator.data_refresh_router) workflow.add_edge(START, "GraphInitiator") sub_agents = [ "SocialAgent", "IntelligenceAgent", "EconomicalAgent", "PoliticalAgent", "MeteorologicalAgent", ] for agent in sub_agents: workflow.add_edge("GraphInitiator", agent) workflow.add_edge(agent, "FeedAggregatorAgent") workflow.add_edge("FeedAggregatorAgent", "DataRefresherAgent") workflow.add_edge("DataRefresherAgent", "DataRefreshRouter") workflow.add_conditional_edges( "DataRefreshRouter", lambda x: x.route if x.route else "END", {"GraphInitiator": "GraphInitiator", "END": END}, ) return workflow.compile() print("Building Combined Agent Graph...") llm = GroqLLM().get_llm() builder = CombinedAgentGraphBuilder(llm) graph = builder.build_graph() print("Combined Graph ready")