| """ | |
| RogerGraph.py - Main Roger Graph with Fan-Out/Fan-In Architecture | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from langgraph.graph import StateGraph, START, END | |
| from src.states.combinedAgentState import CombinedAgentState | |
| from src.nodes.combinedAgentNode import CombinedAgentNode | |
| from src.graphs.dataRetrievalAgentGraph import DataRetrievalAgentGraph | |
| from src.graphs.meteorologicalAgentGraph import MeteorologicalGraphBuilder | |
| from src.graphs.politicalAgentGraph import PoliticalGraphBuilder | |
| from src.graphs.economicalAgentGraph import EconomicalGraphBuilder | |
| from src.graphs.intelligenceAgentGraph import IntelligenceGraphBuilder | |
| from src.graphs.socialAgentGraph import SocialGraphBuilder | |
| from src.llms.groqllm import GroqLLM | |
| logger = logging.getLogger("Roger_graph") | |
| logger.setLevel(logging.INFO) | |
| if not logger.handlers: | |
| ch = logging.StreamHandler() | |
| ch.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) | |
| logger.addHandler(ch) | |
| class CombinedAgentGraphBuilder: | |
| def __init__(self, llm): | |
| self.llm = llm | |
| def build_graph(self): | |
| logger.info("Building Roger Combined Agent Graph") | |
| social_builder = SocialGraphBuilder(self.llm) | |
| intelligence_builder = IntelligenceGraphBuilder(self.llm) | |
| economical_builder = EconomicalGraphBuilder(self.llm) | |
| political_builder = PoliticalGraphBuilder(self.llm) | |
| meteorological_builder = MeteorologicalGraphBuilder(self.llm) | |
| data_retrieval_builder = DataRetrievalAgentGraph(self.llm) | |
| orchestrator = CombinedAgentNode(self.llm) | |
| workflow = StateGraph(CombinedAgentState) | |
| workflow.add_node("GraphInitiator", orchestrator.graph_initiator) | |
| workflow.add_node("FeedAggregatorAgent", orchestrator.feed_aggregator_agent) | |
| workflow.add_node("DataRefresherAgent", orchestrator.data_refresher_agent) | |
| workflow.add_node("DataRefreshRouter", orchestrator.data_refresh_router) | |
| workflow.add_node("SocialAgent", social_builder.build_graph()) | |
| workflow.add_node("IntelligenceAgent", intelligence_builder.build_graph()) | |
| workflow.add_node("EconomicalAgent", economical_builder.build_graph()) | |
| workflow.add_node("PoliticalAgent", political_builder.build_graph()) | |
| workflow.add_node("MeteorologicalAgent", meteorological_builder.build_graph()) | |
| workflow.add_node( | |
| "DataRetrievalAgent", | |
| data_retrieval_builder.build_data_retrieval_agent_graph(), | |
| ) | |
| workflow.add_edge(START, "GraphInitiator") | |
| domain_agents = [ | |
| "SocialAgent", | |
| "IntelligenceAgent", | |
| "EconomicalAgent", | |
| "PoliticalAgent", | |
| "MeteorologicalAgent", | |
| "DataRetrievalAgent", | |
| ] | |
| for agent in domain_agents: | |
| workflow.add_edge("GraphInitiator", agent) | |
| for agent in domain_agents: | |
| workflow.add_edge(agent, "FeedAggregatorAgent") | |
| workflow.add_edge("FeedAggregatorAgent", "DataRefresherAgent") | |
| workflow.add_edge("DataRefresherAgent", "DataRefreshRouter") | |
| def route_decision(state): | |
| route = getattr(state, "route", []) | |
| if route is None or route == "": | |
| return END | |
| if route == "GraphInitiator": | |
| return "GraphInitiator" | |
| return END | |
| workflow.add_conditional_edges( | |
| "DataRefreshRouter", | |
| route_decision, | |
| {"GraphInitiator": "GraphInitiator", END: END}, | |
| ) | |
| graph = workflow.compile() | |
| logger.info("Roger Graph compiled successfully") | |
| return graph | |
| llm = GroqLLM().get_llm() | |
| builder = CombinedAgentGraphBuilder(llm) | |
| graph = builder.build_graph() | |