File size: 10,700 Bytes
b4856f1
 
 
 
 
 
 
 
752f5cc
b4856f1
 
 
 
 
752f5cc
 
 
 
b4856f1
 
 
 
 
 
 
 
 
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
 
 
 
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
752f5cc
b4856f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
 
 
 
 
 
 
 
 
b4856f1
 
752f5cc
b4856f1
 
752f5cc
b4856f1
 
 
 
 
 
 
752f5cc
b4856f1
 
752f5cc
b4856f1
 
 
 
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
 
 
752f5cc
b4856f1
 
752f5cc
b4856f1
752f5cc
b4856f1
 
752f5cc
b4856f1
 
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
 
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
752f5cc
 
b4856f1
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
 
 
 
 
 
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
b4856f1
 
 
 
 
 
 
 
16ec2cf
b4856f1
 
 
 
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
 
b4856f1
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
 
 
 
 
752f5cc
b4856f1
752f5cc
b4856f1
752f5cc
 
 
 
 
 
b4856f1
752f5cc
16ec2cf
b4856f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16ec2cf
b4856f1
 
16ec2cf
b4856f1
752f5cc
b4856f1
752f5cc
b4856f1
 
752f5cc
 
 
 
 
 
 
 
 
b4856f1
 
 
752f5cc
b4856f1
 
752f5cc
b4856f1
 
752f5cc
b4856f1
 
 
 
 
 
 
 
 
 
 
 
752f5cc
 
 
 
 
b4856f1
 
 
 
752f5cc
b4856f1
 
752f5cc
b4856f1
752f5cc
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
"""
src/nodes/dataRetrievalAgentNode.py
COMPLETE - Data Retrieval Agent Node Implementation
Handles orchestrator-worker pattern for scraping tasks

Updated: Uses Tool Factory pattern for parallel execution safety.
Each agent instance gets its own private set of tools.
"""

import json
import uuid
from typing import List
from langchain_core.messages import HumanMessage, SystemMessage
from src.states.dataRetrievalAgentState import (
    DataRetrievalAgentState,
    ScrapingTask,
    RawScrapedData,
    ClassifiedEvent,
)
from src.utils.tool_factory import create_tool_set


class DataRetrievalAgentNode:
    """
    Implements the Data Retrieval Agent workflow:
    1. Master Agent - Plans scraping tasks
    2. Worker Agent - Executes individual tasks
    3. Tool Node - Runs the actual tools
    4. Classifier Agent - Categorizes results for domain agents

    Thread Safety:
        Each DataRetrievalAgentNode instance creates its own private ToolSet,
        enabling safe parallel execution with other agents.
    """

    def __init__(self, llm):
        """Initialize with LLM and private tool set"""
        # Create PRIVATE tool instances for this agent
        self.tools = create_tool_set()
        self.llm = llm

    # =========================================================================
    # 1. MASTER AGENT (TASK DELEGATOR)
    # =========================================================================

    def master_agent_node(self, state: DataRetrievalAgentState):
        """
        TASK DELEGATOR MASTER AGENT

        Decides which scraping tools to run based on:
        - Previously completed tasks (avoid redundancy)
        - Current monitoring needs
        - Keywords of interest

        Returns: List[ScrapingTask]
        """
        print("=== [MASTER AGENT] Planning Scraping Tasks ===")

        completed_tools = [r.source_tool for r in state.worker_results]

        system_prompt = f"""
You are the Master Data Retrieval Agent for Roger - Sri Lanka's situational awareness platform.

AVAILABLE TOOLS: {list(self.tools.as_dict().keys())}

Your job:
1. Decide which tools to run to keep the system updated
2. Avoid re-running tools just executed: {completed_tools}
3. Prioritize a mix of:
   - Official sources: scrape_government_gazette, scrape_parliament_minutes, scrape_train_schedule
   - Market data: scrape_cse_stock_data, scrape_local_news
   - Social media: scrape_reddit, scrape_twitter, scrape_facebook

Focus on Sri Lankan context with keywords like:
- "election", "policy", "budget", "strike", "inflation"
- "fuel", "railway", "protest", "flood", "gazette"

Previously planned: {state.previous_tasks}

Respond with valid JSON array:
[
  {{
    "tool_name": "<tool_name>",
    "parameters": {{"keywords": [...]}},
    "priority": "high" | "normal"
  }},
  ...
]

If no tasks needed, return []
"""

        parsed_tasks: List[ScrapingTask] = []

        try:
            response = self.llm.invoke(
                [
                    SystemMessage(content=system_prompt),
                    HumanMessage(
                        content="Plan the next scraping wave for Sri Lankan situational awareness."
                    ),
                ]
            )

            raw = response.content
            suggested = json.loads(raw)

            if isinstance(suggested, dict):
                suggested = [suggested]

            for item in suggested:
                try:
                    task = ScrapingTask(**item)
                    parsed_tasks.append(task)
                except Exception as e:
                    print(f"[MASTER] Failed to parse task: {e}")
                    continue

        except Exception as e:
            print(f"[MASTER] LLM planning failed: {e}, using fallback plan")

        # Fallback plan if LLM fails
        if not parsed_tasks and not state.previous_tasks:
            parsed_tasks = [
                ScrapingTask(
                    tool_name="scrape_local_news",
                    parameters={"keywords": ["Sri Lanka", "economy", "politics"]},
                    priority="high",
                ),
                ScrapingTask(
                    tool_name="scrape_cse_stock_data",
                    parameters={"symbol": "ASPI"},
                    priority="high",
                ),
                ScrapingTask(
                    tool_name="scrape_government_gazette",
                    parameters={"keywords": ["tax", "import", "regulation"]},
                    priority="normal",
                ),
                ScrapingTask(
                    tool_name="scrape_reddit",
                    parameters={"keywords": ["Sri Lanka"], "limit": 20},
                    priority="normal",
                ),
            ]

        print(f"[MASTER] Planned {len(parsed_tasks)} tasks")

        return {
            "generated_tasks": parsed_tasks,
            "previous_tasks": [t.tool_name for t in parsed_tasks],
        }

    # =========================================================================
    # 2. WORKER AGENT
    # =========================================================================

    def worker_agent_node(self, state: DataRetrievalAgentState):
        """
        DATA RETRIEVAL WORKER AGENT

        Pops next task from queue and prepares it for ToolNode execution.
        This runs in parallel via map() in the graph.
        """
        if not state.generated_tasks:
            print("[WORKER] No tasks in queue")
            return {}

        # Pop first task (FIFO)
        current_task = state.generated_tasks[0]
        remaining = state.generated_tasks[1:]

        print(f"[WORKER] Dispatching -> {current_task.tool_name}")

        return {"generated_tasks": remaining, "current_task": current_task}

    # =========================================================================
    # 3. TOOL NODE
    # =========================================================================

    def tool_node(self, state: DataRetrievalAgentState):
        """
        TOOL NODE

        Executes the actual scraping tool specified by current_task.
        Handles errors gracefully and records results.
        """
        current_task = state.current_task
        if current_task is None:
            print("[TOOL NODE] No active task")
            return {}

        print(f"[TOOL NODE] Executing -> {current_task.tool_name}")

        tool_func = self.tools.get(current_task.tool_name)

        if tool_func is None:
            output = f"Tool '{current_task.tool_name}' not found in registry"
            status = "failed"
        else:
            try:
                # Invoke LangChain tool with parameters
                output = tool_func.invoke(current_task.parameters or {})
                status = "success"
                print("[TOOL NODE] ✓ Success")
            except Exception as e:
                output = f"Error: {str(e)}"
                status = "failed"
                print(f"[TOOL NODE] ✗ Failed: {e}")

        result = RawScrapedData(
            source_tool=current_task.tool_name, raw_content=str(output), status=status
        )

        return {"current_task": None, "worker_results": [result]}

    # =========================================================================
    # 4. CLASSIFIER AGENT
    # =========================================================================

    def classifier_agent_node(self, state: DataRetrievalAgentState):
        """
        DATA CLASSIFIER AGENT

        Analyzes scraped data and routes it to appropriate domain agents.
        Creates ClassifiedEvent objects with summaries and target agents.
        """
        if not state.latest_worker_results:
            print("[CLASSIFIER] No new results to process")
            return {}

        print(f"[CLASSIFIER] Processing {len(state.latest_worker_results)} results")

        agent_categories = [
            "social",
            "economical",
            "political",
            "mobility",
            "weather",
            "intelligence",
        ]

        system_prompt = """
You are a data classification expert for Roger.

AVAILABLE AGENTS:
- social: Social media sentiment, public discussions
- economical: Stock market, economic indicators, CSE data
- political: Government gazette, parliament, regulations
- mobility: Transportation, train schedules, logistics
- weather: Meteorological data, disaster alerts
- intelligence: Brand monitoring, entity tracking

Task: Analyze the scraped data and:
1. Write a one-sentence summary
2. Choose the most appropriate agent

Respond with JSON:
{
  "summary": "<brief summary>",
  "target_agent": "<agent_name>"
}
"""

        all_classified: List[ClassifiedEvent] = []

        for result in state.latest_worker_results:
            try:
                response = self.llm.invoke(
                    [
                        SystemMessage(content=system_prompt),
                        HumanMessage(
                            content=f"Source: {result.source_tool}\n\nData:\n{result.raw_content[:2000]}"
                        ),
                    ]
                )

                result_json = json.loads(response.content)
                summary = result_json.get("summary", "No summary")
                target = result_json.get("target_agent", "social")

                if target not in agent_categories:
                    target = "social"

            except Exception as e:
                print(f"[CLASSIFIER] LLM failed: {e}, using rule-based classification")

                # Fallback rule-based classification
                source = result.source_tool.lower()
                if "stock" in source or "cse" in source:
                    target = "economical"
                elif "gazette" in source or "parliament" in source:
                    target = "political"
                elif "train" in source or "schedule" in source:
                    target = "mobility"
                elif any(s in source for s in ["reddit", "twitter", "facebook"]):
                    target = "social"
                else:
                    target = "social"

                summary = (
                    f"Data from {result.source_tool}: {result.raw_content[:150]}..."
                )

            classified = ClassifiedEvent(
                event_id=str(uuid.uuid4()),
                content_summary=summary,
                target_agent=target,
                confidence_score=0.85,
            )
            all_classified.append(classified)

        print(f"[CLASSIFIER] Classified {len(all_classified)} events")

        return {"classified_buffer": all_classified, "latest_worker_results": []}