from __future__ import annotations import asyncio import time from typing import Any, Callable, Dict, List, Optional from agents import custom_span, gen_trace_id, trace from openai.types.responses import ResponseTextDeltaEvent from pydantic import BaseModel, Field, ValidationError from util import formate_message from tools.search_tool import SimpleArticle from utils.baseclass import ResearchRunner from tools.detail_plan_agent import CoreSection from tools.knowledge_gap_agent import ( KnowledgeGapOutput, knowledge_gap_agent, ) from tools.long_writer_agent import ( LongWriterOutput, clean_json_response, extract_from_failed_json, ) from tools.thinking_agent import thinking_agent from tools.tool_selector_agent import ( AgentSelectionPlan, AgentTask, tool_selector_agent, ) from tools.writer_agent import writer_section_agent from utils.schemas import TOOL_AGENTS, ToolAgentOutput, InputCallbackTool from utils.parse_output import create_type_parser from config_logger import logger # logger = logging.getLogger(__name__) class IterationData(BaseModel): """Data for a single iteration of the research loop.""" gap: str = Field( description="The gap addressed in the iteration", default_factory=list ) tool_calls: List[str] = Field( description="The tool calls made", default_factory=list ) findings: List[str] = Field( description="The findings collected from tool calls", default_factory=list ) thought: List[str] = Field( description="The thinking done to reflect on the success of the iteration and next steps", default_factory=list, ) class Conversation(BaseModel): """A conversation between the user and the iterative researcher.""" history: List[IterationData] = Field( description="The data for each iteration of the research loop", default_factory=list, ) def add_iteration(self, iteration_data: Optional[IterationData] = None): if iteration_data is None: iteration_data = IterationData() self.history.append(iteration_data) def set_latest_gap(self, gap: str): self.history[-1].gap = gap def set_latest_tool_calls(self, tool_calls: List[str]): self.history[-1].tool_calls = tool_calls def set_latest_findings(self, findings: List[str]): self.history[-1].findings = findings def set_latest_thought(self, thought: str): self.history[-1].thought = thought def get_latest_gap(self) -> str: return self.history[-1].gap def get_latest_tool_calls(self) -> List[str]: return self.history[-1].tool_calls def get_latest_findings(self) -> List[str]: return self.history[-1].findings def get_latest_thought(self) -> str: return self.history[-1].thought def get_all_findings(self) -> List[str]: return [ finding for iteration_data in self.history for finding in iteration_data.findings ] def compile_conversation_history(self) -> str: """Compile the conversation history into a string.""" conversation = "" for iteration_num, iteration_data in enumerate(self.history): conversation += f"[ITERATION {iteration_num + 1}]\n\n" if iteration_data.thought: conversation += f"{self.get_thought_string(iteration_num)}\n\n" if iteration_data.gap: conversation += f"{self.get_task_string(iteration_num)}\n\n" if iteration_data.tool_calls: conversation += f"{self.get_action_string(iteration_num)}\n\n" if iteration_data.findings: conversation += f"{self.get_findings_string(iteration_num)}\n\n" return conversation def get_task_string(self, iteration_num: int) -> str: """Get the task for the current iteration.""" if self.history[iteration_num].gap: return f"\nAddress this knowledge gap: {self.history[iteration_num].gap}\n" return "" def get_action_string(self, iteration_num: int) -> str: """Get the action for the current iteration.""" if self.history[iteration_num].tool_calls: joined_calls = "\n".join(self.history[iteration_num].tool_calls) return ( "\nCalling the following tools to address the knowledge gap:\n" f"{joined_calls}\n" ) return "" def get_findings_string(self, iteration_num: int) -> str: """Get the findings for the current iteration.""" if self.history[iteration_num].findings: joined_findings = "\n\n".join(self.history[iteration_num].findings) return f"\n{joined_findings}\n" return "" def get_thought_string(self, iteration_num: int) -> str: """Get the thought for the current iteration.""" if self.history[iteration_num].thought: return f"\n{self.history[iteration_num].thought}\n" return "" def latest_task_string(self) -> str: """Get the latest task.""" return self.get_task_string(len(self.history) - 1) def latest_action_string(self) -> str: """Get the latest action.""" return self.get_action_string(len(self.history) - 1) def latest_findings_string(self) -> str: """Get the latest findings.""" return self.get_findings_string(len(self.history) - 1) def latest_thought_string(self) -> str: """Get the latest thought.""" return self.get_thought_string(len(self.history) - 1) class IterativeResearcher: """Manager for the iterative research workflow that conducts research on a topic or subtopic by running a continuous research loop.""" def __init__( self, max_iterations: int = 5, max_time_minutes: int = 10, verbose: bool = True, tracing: bool = False, thoughts_callback: Optional[Callable[[str], Any]] = None, hooks=None, u_id: str = "", ): self.max_iterations: int = max_iterations self.max_time_minutes: int = max_time_minutes self.start_time: float = None self.iteration: int = 0 self.conversation: Conversation = Conversation() self.should_continue: bool = True self.verbose: bool = verbose self.tracing: bool = tracing self.thoughts_callback = thoughts_callback self.hooks = hooks self.u_id = u_id if thoughts_callback is None: async def noop(x): pass self.thoughts_callback = noop self.references = [] async def run( self, query: str, output_length: str = "", # A text description of the desired output length, can be left blank output_instructions: CoreSection = None, # Instructions for the final report (e.g. don't include any headings, just a couple of paragraphs of text) background_context: str = "", ) -> tuple[Any, List[str]]: """Run the deep research workflow for a given query.""" self.start_time = time.time() if self.tracing: trace_id = gen_trace_id() workflow_trace = trace("iterative_researcher", trace_id=trace_id) print( f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}" ) workflow_trace.start(mark_as_current=True) # await self._log_message("=== Starting Iterative Research Workflow ===") # Iterative research loop while self.should_continue and self._check_constraints(): is_constraints = self._check_constraints() # print(f"max_iteration:{self.max_iterations},now iteration is {self.iteration}") self.iteration += 1 # await self._log_message(f"\n=== Starting Iteration {self.iteration} ===") # Set up blank IterationData for this iteration self.conversation.add_iteration() # await self._log_message(f"Query is {query}") # 1. Generate observations observations: str = await self._generate_observations( query, background_context=background_context ) # await self._log_message(f"Observations is {observations}") # 2. Evaluate current gaps in the research evaluation: KnowledgeGapOutput = await self._evaluate_gaps( query, background_context=background_context ) # await self._log_message(f"Observations gaps is {evaluation.outstanding_gaps}") # await self._log_message(f"Observations research_complete is {evaluation.research_complete}") # Check if we should continue or break the loop if not evaluation.research_complete: next_gap = evaluation.outstanding_gaps[0] # 3. Select agents to address knowledge gap selection_plan: AgentSelectionPlan = await self._select_agents( next_gap, query, background_context=background_context ) # await self._log_message( # f"Selection_plan.tasks:{selection_plan.tasks}\n" # ) # 4. Run the selected agents to gather information results: Dict[str, ToolAgentOutput] = await self._execute_tools( selection_plan.tasks ) # await self._log_message(f"Execute_tool_results : {results}") else: self.should_continue = False # await self._log_message( # "=== IterativeResearcher Marked As Complete - Finalizing Output ===" # ) # if not self._check_constraints(): # await self._log_message("\n=== Ending Research Loop ===") # Create final report # outline is the final_detailed_outline # report = await self._create_final_report( # query, length=output_length, instructions=output_instructions # ) report = await self._create_review_section( query, length=output_length, instructions=output_instructions, ) # check_section = await self._check_section(report) # elapsed_time = time.time() - self.start_time # await self._log_message( # f"IterativeResearcher completed in {int(elapsed_time // 60)} minutes and {int(elapsed_time % 60)} seconds after {self.iteration} iterations." # ) if self.tracing: workflow_trace.finish(reset_current=True) return report, self.references def _check_constraints(self) -> bool: """Check if we've exceeded our constraints (max iterations or time).""" if self.iteration >= self.max_iterations: # self._log_message("\n=== Ending Research Loop ===") # self._log_message(f"Reached maximum iterations ({self.max_iterations})") return False elapsed_minutes = (time.time() - self.start_time) / 60 if elapsed_minutes >= self.max_time_minutes: # self._log_message("\n=== Ending Research Loop ===") # self._log_message(f"Reached maximum time ({self.max_time_minutes} minutes)") return False return True async def _evaluate_gaps( self, query: str, background_context: str = "" ) -> KnowledgeGapOutput: """Evaluate the current state of research and identify knowledge gaps.""" background = ( f"BACKGROUND CONTEXT:\n{background_context}" if background_context else "" ) input_str = f""" Current Iteration Number: {self.iteration} Time Elapsed: {(time.time() - self.start_time) / 60:.2f} minutes of maximum {self.max_time_minutes} minutes ORIGINAL QUERY: {query} {background} HISTORY OF ACTIONS, FINDINGS AND THOUGHTS: {self.conversation.compile_conversation_history() or "No previous actions, findings or thoughts available."} """ result = await ResearchRunner.run( knowledge_gap_agent, input_str, hooks=self.hooks ) evaluation = result.final_output_as(KnowledgeGapOutput) if not evaluation.research_complete: next_gap = evaluation.outstanding_gaps[0] self.conversation.set_latest_gap(next_gap) # await self._log_message(self.conversation.latest_task_string()) return evaluation async def _select_agents( self, gap: str, query: str, background_context: str = "" ) -> AgentSelectionPlan: """Select agents to address the identified knowledge gap.""" background = ( f"BACKGROUND CONTEXT:\n{background_context}" if background_context else "" ) input_str = f""" ORIGINAL QUERY: {query} KNOWLEDGE GAP TO ADDRESS: {gap} {background} HISTORY OF ACTIONS, FINDINGS AND THOUGHTS: {self.conversation.compile_conversation_history() or "No previous actions, findings or thoughts available."} """ result = await ResearchRunner.run( tool_selector_agent, input_str, hooks=self.hooks, ) selection_plan = result.final_output_as(AgentSelectionPlan) # Add the tool calls to the conversation self.conversation.set_latest_tool_calls( [ f"[Agent] {task.agent} [Query] {task.query} [Entity] {task.entity_website if task.entity_website else 'null'}" for task in selection_plan.tasks ] ) # await self._log_message(self.conversation.latest_action_string()) return selection_plan async def _execute_tools( self, tasks: List[AgentTask] ) -> Dict[str, ToolAgentOutput]: """Execute the selected tools concurrently to gather information.""" with custom_span("Execute Tool Agents"): # Create a task for each agent async_tasks = [] sem = asyncio.Semaphore(1) # Limit concurrency to 5 async def limited_task(task): async with sem: # Acquire semaphore on entry, release on exit return await self._run_agent_task(task) for task in tasks: # await self._log_message(f"\ntask is runing: {task} \n") await self._log_message( formate_message( type="search", message=f"Searching articles by {task.query}..." ) ) async_tasks.append(limited_task(task)) # Run all tasks concurrently num_completed = 0 results = {} for future in asyncio.as_completed(async_tasks): gap, agent_name, result = await future results[f"{agent_name}_{gap}"] = result num_completed += 1 # await self._log_message( # f"\nTool execution progress: {num_completed}/{len(async_tasks)}\n" # ) # Add findings from the tool outputs to the conversation findings = [] for tool_output in results.values(): findings.append(tool_output.output) self.conversation.set_latest_findings(findings) return results async def _run_agent_task( self, task: AgentTask ) -> tuple[str, str, ToolAgentOutput]: """Run a single agent task and return the result.""" try: agent_name = task.agent agent = TOOL_AGENTS.get(agent_name) if agent: # result = await ResearchRunner.run( # agent, # task.model_dump_json(), # hooks=self.hooks, # ) # output = result.final_output_as(ToolAgentOutput) ## stream-output # await self._log_message( # formate_message( # type="search", # message="Searching articles by Articles_search_tool...", # ) # ) input_call = InputCallbackTool( # thoughts_callback=self.thoughts_callback, u_id=str(self.u_id), is_pkb=False, results_callback=self.thoughts_callback, # c_id=str(c_id), ) synthesis_streamed_result = ResearchRunner.run_streamed( agent, task.model_dump_json(), context=input_call, hooks=self.hooks, ) full_response = "" def get_references(articles: List[SimpleArticle]): for article in articles: self.references.append(f"<{article.hash_id}> {article.source}") async for event in synthesis_streamed_result.stream_events(): if event.type == "raw_response_event" and isinstance( event.data, ResponseTextDeltaEvent ): token = event.data.delta full_response += token elif event.type == "run_item_stream_event": if event.item.type == "tool_call_output_item": tool_call_output = event.item.output # print(f"########## tool_call_output {tool_call_output}") # await self._log_message(f"########## tool_call_output {type(tool_call_output)},isinstance {isinstance(tool_call_output,list )}") if ( isinstance(tool_call_output, list) and len(tool_call_output) > 0 and isinstance(tool_call_output[0], SimpleArticle) ): get_references(tool_call_output) # print(f"########## referencfull_responsees {full_response}") result = ToolAgentOutput(output=full_response, sources=[]) # Extract ToolAgentOutput from RunResult output = result else: output = ToolAgentOutput( output=f"No implementation found for agent {agent_name}", sources=[] ) return task.gap, agent_name, output except Exception as e: error_output = ToolAgentOutput( output=f"Error executing {task.agent} for gap '{task.gap}': {str(e)}", sources=[], ) return task.gap, task.agent, error_output async def _generate_observations( self, query: str, background_context: str = "" ) -> str: """Generate observations from the current state of the research.""" background = ( f"BACKGROUND CONTEXT:\n{background_context}" if background_context else "" ) input_str = f""" ORIGINAL QUERY: {query} {background} HISTORY OF ACTIONS, FINDINGS AND THOUGHTS: {self.conversation.compile_conversation_history() or "No previous actions, findings or thoughts available."} """ result = await ResearchRunner.run(thinking_agent, input_str, hooks=self.hooks) # Add the observations to the conversation observations = result.final_output self.conversation.set_latest_thought(observations) # await self._log_message(self.conversation.latest_thought_string()) return observations # async def _create_final_report( # self, query: str, length: str = "", instructions: str = "" # ) -> str: # """Create the final response from the completed draft.""" # # await self._log_message("=== Drafting Final Response ===") # length_str = ( # f"* The full response should be approximately {length}.\n" if length else "" # ) # instructions_str = f"* {instructions}" if instructions else "" # guidelines_str = ( # ("\n\nGUIDELINES:\n" + length_str + instructions_str).strip("\n") # if length or instructions # else "" # ) # all_findings = ( # "\n\n".join(self.conversation.get_all_findings()) # or "No findings available yet." # ) # input_str = f""" # Provide a response based on the query and findings below with as much detail as possible. {guidelines_str} # QUERY: {query} # FINDINGS: # {all_findings} # """ # # await self._log_message( # # input_str # # ) # # result = await ResearchRunner.run( # # writer_agent, # # input_str, # # ) # # return result.final_output # # await self._log_message( # # formate_message( # # type="file", message="Generating final report by writer_agent..." # # ) # # ) # ## use the stream response # synthesis_streamed_result = ResearchRunner.run_streamed( # starting_agent=writer_agent, input=input_str # ) # full_response = "" # try: # async for event in synthesis_streamed_result.stream_events(): # # Check for cancellation # # if stop_event and stop_event.is_set(): # # await thoughts_callback("Operation cancelled during synthesis") # # return "Operation cancelled" # # Process different event types # if event.type == "raw_response_event" and isinstance( # event.data, ResponseTextDeltaEvent # ): # token = event.data.delta # full_response += token # # Stream token to the results callback # # Stream agent updates # # elif event.type == "agent_updated_stream_event": # # await self._log_message(f"Agent updated: {event.new_agent.name}") # # await self._log_message( # # "\nFinal response from IterativeResearcher created successfully\n" # # ) # except Exception as e: # logger.error(f"IterativeResearcher create report error: {e} ") # logger.info(f"#############all_findings: {len(self.conversation.get_all_findings())} \n ####full_response: {full_response[:100]}") # return full_response async def _log_message(self, message: str) -> None: """Log a message if verbose is True""" if self.verbose: # if self.thoughts_callback: await self.thoughts_callback(message) else: print(message) async def _create_review_section( self, query: str, length: str = "", instructions: CoreSection = None, ) -> LongWriterOutput: length_str = ( f"* The full response should be approximately {length}.\n" if length else "" ) instructions_str = f"* {instructions}" if instructions else "" guidelines_str = ( ("\n\nGUIDELINES:\n" + length_str + instructions_str).strip("\n") if length or instructions else "" ) all_findings = ( "\n\n".join(self.conversation.get_all_findings()) or "No findings available yet." ) input_str = f""" Provide a response based on the query and findings below with as much detail as possible. SECTION OUTLINE: {instructions.description} SECTION Title" {instructions.title} RAW QUERY: {query} FINDINGS: {all_findings} """ max_iter = 3 iter_num = 0 temp_agent_type = "" while iter_num < max_iter: full_response = "" try: result = ResearchRunner.run_streamed( starting_agent=writer_section_agent, input=input_str ) async for event in result.stream_events(): # Process different event types if event.type == "raw_response_event" and isinstance( event.data, ResponseTextDeltaEvent ): full_response += event.data.delta elif event.type == "agent_updated_stream_event": if event.new_agent.name != temp_agent_type: temp_agent_type = event.new_agent.name final_response = result.final_output try: cleaned_response = clean_json_response(final_response) resf = create_type_parser(LongWriterOutput) res = resf(cleaned_response) return res except Exception as parse_error: # If JSON parsing fails, try manual extraction logger.warning( f"Failed to parse output as JSON in write_next_section ,try extract from failed json: {str(parse_error)[:200]}" ) try: manual_result = extract_from_failed_json(full_response) if manual_result: return manual_result except Exception as manual_error: logger.error( f"Manual extraction also failed: {str(manual_error)[:100]}" ) # Increment iteration counter and continue the loop instead of returning empty references iter_num += 1 logger.error( f"Parse error occurred: {parse_error}. Retrying {iter_num}/{max_iter}..." ) continue except ValidationError: resf = create_type_parser(LongWriterOutput) res = resf(full_response) return res except Exception as e: logger.error(f"Write review section error: {e}") iter_num += 1 logger.error(f"Error occurred: {e}. Retrying {iter_num}/{max_iter}...") # If all retries fail, return an error output return LongWriterOutput( next_section_markdown="The section generate error", references=[] )