modelx / src /graphs /RogerGraph.py
nivakaran's picture
Upload folder using huggingface_hub
b4c4175 verified
"""
RogerGraph.py - Main Roger Graph with Fan-Out/Fan-In Architecture
"""
from __future__ import annotations
import logging
from langgraph.graph import StateGraph, START, END
from src.states.combinedAgentState import CombinedAgentState
from src.nodes.combinedAgentNode import CombinedAgentNode
from src.graphs.dataRetrievalAgentGraph import DataRetrievalAgentGraph
from src.graphs.meteorologicalAgentGraph import MeteorologicalGraphBuilder
from src.graphs.politicalAgentGraph import PoliticalGraphBuilder
from src.graphs.economicalAgentGraph import EconomicalGraphBuilder
from src.graphs.intelligenceAgentGraph import IntelligenceGraphBuilder
from src.graphs.socialAgentGraph import SocialGraphBuilder
from src.llms.groqllm import GroqLLM
logger = logging.getLogger("Roger_graph")
logger.setLevel(logging.INFO)
if not logger.handlers:
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(ch)
class CombinedAgentGraphBuilder:
def __init__(self, llm):
self.llm = llm
def build_graph(self):
logger.info("Building Roger Combined Agent Graph")
social_builder = SocialGraphBuilder(self.llm)
intelligence_builder = IntelligenceGraphBuilder(self.llm)
economical_builder = EconomicalGraphBuilder(self.llm)
political_builder = PoliticalGraphBuilder(self.llm)
meteorological_builder = MeteorologicalGraphBuilder(self.llm)
data_retrieval_builder = DataRetrievalAgentGraph(self.llm)
orchestrator = CombinedAgentNode(self.llm)
workflow = StateGraph(CombinedAgentState)
workflow.add_node("GraphInitiator", orchestrator.graph_initiator)
workflow.add_node("FeedAggregatorAgent", orchestrator.feed_aggregator_agent)
workflow.add_node("DataRefresherAgent", orchestrator.data_refresher_agent)
workflow.add_node("DataRefreshRouter", orchestrator.data_refresh_router)
workflow.add_node("SocialAgent", social_builder.build_graph())
workflow.add_node("IntelligenceAgent", intelligence_builder.build_graph())
workflow.add_node("EconomicalAgent", economical_builder.build_graph())
workflow.add_node("PoliticalAgent", political_builder.build_graph())
workflow.add_node("MeteorologicalAgent", meteorological_builder.build_graph())
workflow.add_node(
"DataRetrievalAgent",
data_retrieval_builder.build_data_retrieval_agent_graph(),
)
workflow.add_edge(START, "GraphInitiator")
domain_agents = [
"SocialAgent",
"IntelligenceAgent",
"EconomicalAgent",
"PoliticalAgent",
"MeteorologicalAgent",
"DataRetrievalAgent",
]
for agent in domain_agents:
workflow.add_edge("GraphInitiator", agent)
for agent in domain_agents:
workflow.add_edge(agent, "FeedAggregatorAgent")
workflow.add_edge("FeedAggregatorAgent", "DataRefresherAgent")
workflow.add_edge("DataRefresherAgent", "DataRefreshRouter")
def route_decision(state):
route = getattr(state, "route", [])
if route is None or route == "":
return END
if route == "GraphInitiator":
return "GraphInitiator"
return END
workflow.add_conditional_edges(
"DataRefreshRouter",
route_decision,
{"GraphInitiator": "GraphInitiator", END: END},
)
graph = workflow.compile()
logger.info("Roger Graph compiled successfully")
return graph
llm = GroqLLM().get_llm()
builder = CombinedAgentGraphBuilder(llm)
graph = builder.build_graph()