File size: 6,462 Bytes
b4856f1 2473009 b4856f1 752f5cc b4856f1 aa3c874 b4c4175 aa3c874 2473009 b4856f1 16ec2cf b4c4175 16ec2cf 2473009 752f5cc b4856f1 16ec2cf b4c4175 16ec2cf 2473009 752f5cc b4856f1 16ec2cf b4c4175 16ec2cf 2473009 752f5cc b4856f1 16ec2cf b4c4175 16ec2cf 2473009 752f5cc b4856f1 16ec2cf b4c4175 16ec2cf 2473009 b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc 2473009 b4856f1 2473009 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
"""
combinedAgentGraph.py - Main entry point for the Combined Agent System.
"""
from __future__ import annotations
from typing import Dict, Any
import logging
from datetime import datetime
from langgraph.graph import StateGraph, START, END
from src.llms.groqllm import GroqLLM
from src.states.combinedAgentState import CombinedAgentState
from src.nodes.combinedAgentNode import CombinedAgentNode
try:
from src.config.langsmith_config import LangSmithConfig
_langsmith = LangSmithConfig()
_langsmith.configure()
except ImportError:
pass
from src.graphs.socialAgentGraph import SocialGraphBuilder
from src.graphs.intelligenceAgentGraph import IntelligenceGraphBuilder
from src.graphs.economicalAgentGraph import EconomicalGraphBuilder
from src.graphs.politicalAgentGraph import PoliticalGraphBuilder
from src.graphs.meteorologicalAgentGraph import MeteorologicalGraphBuilder
logger = logging.getLogger("main_graph")
logger.setLevel(logging.INFO)
if not logger.handlers:
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(ch)
class CombinedAgentGraphBuilder:
def __init__(self, llm):
self.llm = llm
def build_graph(self):
social_graph = SocialGraphBuilder(self.llm).build_graph()
intelligence_graph = IntelligenceGraphBuilder(self.llm).build_graph()
economical_graph = EconomicalGraphBuilder(self.llm).build_graph()
political_graph = PoliticalGraphBuilder(self.llm).build_graph()
meteorological_graph = MeteorologicalGraphBuilder(self.llm).build_graph()
def run_social_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking SocialAgent...")
try:
result = social_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] SocialAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] SocialAgent FAILED: {e}")
return {"domain_insights": []}
def run_intelligence_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking IntelligenceAgent...")
try:
result = intelligence_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] IntelligenceAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] IntelligenceAgent FAILED: {e}")
return {"domain_insights": []}
def run_economical_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking EconomicalAgent...")
try:
result = economical_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] EconomicalAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] EconomicalAgent FAILED: {e}")
return {"domain_insights": []}
def run_political_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking PoliticalAgent...")
try:
result = political_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] PoliticalAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] PoliticalAgent FAILED: {e}")
return {"domain_insights": []}
def run_meteorological_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking MeteorologicalAgent...")
try:
result = meteorological_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] MeteorologicalAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] MeteorologicalAgent FAILED: {e}")
return {"domain_insights": []}
orchestrator = CombinedAgentNode(self.llm)
workflow = StateGraph(CombinedAgentState)
workflow.add_node("SocialAgent", run_social_agent)
workflow.add_node("IntelligenceAgent", run_intelligence_agent)
workflow.add_node("EconomicalAgent", run_economical_agent)
workflow.add_node("PoliticalAgent", run_political_agent)
workflow.add_node("MeteorologicalAgent", run_meteorological_agent)
workflow.add_node("GraphInitiator", orchestrator.graph_initiator)
workflow.add_node("FeedAggregatorAgent", orchestrator.feed_aggregator_agent)
workflow.add_node("DataRefresherAgent", orchestrator.data_refresher_agent)
workflow.add_node("DataRefreshRouter", orchestrator.data_refresh_router)
workflow.add_edge(START, "GraphInitiator")
sub_agents = [
"SocialAgent",
"IntelligenceAgent",
"EconomicalAgent",
"PoliticalAgent",
"MeteorologicalAgent",
]
for agent in sub_agents:
workflow.add_edge("GraphInitiator", agent)
workflow.add_edge(agent, "FeedAggregatorAgent")
workflow.add_edge("FeedAggregatorAgent", "DataRefresherAgent")
workflow.add_edge("DataRefresherAgent", "DataRefreshRouter")
workflow.add_conditional_edges(
"DataRefreshRouter",
lambda x: x.route if x.route else "END",
{"GraphInitiator": "GraphInitiator", "END": END},
)
return workflow.compile()
print("Building Combined Agent Graph...")
llm = GroqLLM().get_llm()
builder = CombinedAgentGraphBuilder(llm)
graph = builder.build_graph()
print("Combined Graph ready")
|