File size: 7,857 Bytes
27d6c4f d7b323e 27d6c4f 0989d5f 27d6c4f 0989d5f 27d6c4f d7b323e 27d6c4f d7b323e 27d6c4f d7b323e 27d6c4f 0989d5f 27d6c4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
"""
LangGraph Code‑Interpreter Agent
================================
A minimal, production‑ready example that wires a Python code‑execution tool into
a LangGraph workflow with an *LLM → plan → execute → reflect* loop.
Key changes (2025‑06‑20)
-----------------------
* **Whitelisted built‑ins** for safer `python_exec`.
* **Timeout guard** – aborts if the workflow exceeds a wall‑clock limit (default
30 s, configurable via `LANGGRAPH_TIMEOUT_SEC`).
* **Dataclass state** – replaced untyped `Dict[str, Any]` with a typed
`@dataclass AgentState` for clearer intent and static‑analysis friendliness.
Dependencies
------------
```bash
pip install langgraph langchain openai tiktoken tenacity
```
Set the environment variable `OPENAI_API_KEY` before running.
Optionally, you can swap `python_exec` with a sandboxed runner such as `e2b` or
`codeinterpreter-api`.
"""
from __future__ import annotations
import contextlib
import io
import os
import textwrap
import time
import traceback
from dataclasses import dataclass, replace
from typing import Any, Optional
import re # For stripping markdown fences
import cv2 # OpenCV – image manipulation
import pandas as pd # Pandas – DataFrame / CSV utilities
from langchain_groq import ChatGroq
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.graph import END, StateGraph
###############################################################################
# 0. Global config
###############################################################################
MODEL_NAME = os.getenv("LANGGRAPH_MODEL", "qwen-qwq-32b")
TIMEOUT_SEC = int(os.getenv("LANGGRAPH_TIMEOUT_SEC", "30"))
###############################################################################
# 1. Code‑execution tool (whitelisted built‑ins)
###############################################################################
# ---------------------------------------------------------------------------
# Limited import helper – only expose specific third-party libs to user code
# ---------------------------------------------------------------------------
def _safe_import(name, globals=None, locals=None, fromlist=(), level=0):
"""Whitelisted __import__ permitting just `cv2` and `pandas`."""
if name in {"cv2", "pandas"}:
return __import__(name, globals, locals, fromlist, level)
raise ImportError(f"Import of module '{name}' is disabled in this sandbox.")
ALLOWED_BUILTINS: dict[str, Any] = {
"print": print,
"range": range,
"len": len,
"abs": abs,
"sum": sum,
"min": min,
"max": max,
"open": open, # Needed by pandas for file I/O
"__import__": _safe_import, # Allow limited, safe imports
}
@tool
def python_exec(code: str) -> str:
"""Execute **Python** inside a restricted namespace and capture STDOUT."""
code = textwrap.dedent(code)
exec_globals = {
"__builtins__": ALLOWED_BUILTINS,
"cv2": cv2,
"pd": pd, # Common alias
"pandas": pd, # Full name
}
local_ns: dict[str, Any] = {}
stdout = io.StringIO()
try:
with contextlib.redirect_stdout(stdout):
exec(code, exec_globals, local_ns) # noqa: S102
return stdout.getvalue() or "Code executed successfully, no output."
except Exception:
return "ERROR:\n" + traceback.format_exc()
###############################################################################
# 2. LLM backend
###############################################################################
llm = ChatGroq(model=MODEL_NAME, temperature= 0.6)
###############################################################################
# 3. Dataclass‑based state & LangGraph
###############################################################################
@dataclass
class AgentState:
"""Typed state object carried through the graph."""
input: str
start_time: float
code: Optional[str] = None
exec_result: Optional[str] = None
tries: int = 0
done: bool = False
graph = StateGraph(AgentState)
# 3‑A Plan node – write code
def plan_node(state: AgentState) -> AgentState:
prompt = [
SystemMessage(
content=(
"You are an expert Python developer. Given a user request, "
"write self‑contained Python code that prints ONLY the final "
"answer via `print()`. Always avoid network calls."
)
),
HumanMessage(content=state.input),
]
code_block = _extract_code(llm(prompt).content)
return replace(state, code=code_block)
# 3‑B Execute node – run code
def exec_node(state: AgentState) -> AgentState:
output = python_exec(state.code or "")
return replace(state, exec_result=output)
# 3‑C Reflect node – repair on error (max 2 retries, with timeout guard)
def reflect_node(state: AgentState) -> AgentState:
if time.time() - state.start_time > TIMEOUT_SEC:
return replace(
state,
done=True,
exec_result=f"ERROR:\nTimeout: exceeded {TIMEOUT_SEC}s budget",
)
tries = state.tries + 1
if tries >= 2:
return replace(state, done=True, tries=tries)
prompt = [
SystemMessage(
content=(
"You are an expert Python debugger. Your job is to fix the "
"given code so it runs without errors and still answers the "
"original question. Return ONLY the corrected code."
)
),
HumanMessage(content="Code:\n" + (state.code or "")),
AIMessage(content="Error:\n" + (state.exec_result or "")),
]
fixed_code = _extract_code(llm(prompt).content)
return replace(state, code=fixed_code, tries=tries)
# 3‑D Wire nodes & conditional edges
graph.add_node("plan", plan_node)
graph.add_node("execute", exec_node)
graph.add_node("reflect", reflect_node)
graph.set_entry_point("plan")
graph.add_edge("plan", "execute")
def needs_fix(state: AgentState) -> bool:
return (state.exec_result or "").startswith("ERROR")
graph.add_conditional_edges(
"execute",
needs_fix,
{True: "reflect", False: END},
)
# After reflection, either run the fixed code again or terminate if `done`.
def should_continue(state: AgentState) -> bool:
"""Return True to stop, False to continue executing."""
return state.done
graph.add_conditional_edges(
"reflect",
should_continue,
{True: END, False: "execute"},
)
agent = graph.compile()
###############################################################################
# 4. Helper function & CLI entry‑point
###############################################################################
def run_agent(query: str) -> str:
"""Run the agent end‑to‑end and return the printed answer (or error)."""
init_state = AgentState(input=query, start_time=time.time())
final_state = agent.invoke(init_state)
# The compiled graph returns an AddableValuesDict (dict-like),
# so we access keys rather than attributes.
return final_state.get("exec_result", "No result")
# ---------------------------------------------------------------------------
# Helper to strip Markdown code fences (```python ... ```)
# ---------------------------------------------------------------------------
def _extract_code(text: str) -> str:
"""Return the first code block in *text* or the raw text if none found."""
match = re.search(r"```(?:python|py)?\s*(.*?)```", text, flags=re.S | re.I)
return match.group(1).strip() if match else text.strip()
if __name__ == "__main__":
import sys
question = (
sys.argv[1] if len(sys.argv) > 1 else "What is the 10th Fibonacci number?"
)
print(run_agent(question))
|