| | """ |
| | Query Executor Service |
| | |
| | Handles query processing with intent detection, data querying, and response generation. |
| | Uses semantic search for scalable dataset discovery and session-scoped layer storage. |
| | """ |
| |
|
| | from backend.core.llm_gateway import LLMGateway |
| | from backend.services.data_loader import get_data_loader |
| | from backend.core.geo_engine import get_geo_engine |
| | from backend.services.response_formatter import ResponseFormatter |
| | from backend.core.session_store import get_session_store |
| | from backend.core.semantic_search import get_semantic_search |
| | from backend.core.data_catalog import get_data_catalog |
| | from backend.core.query_planner import get_query_planner |
| | from typing import List, Dict, Any, Optional |
| | import json |
| | import datetime |
| | import uuid |
| | import logging |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | DEFAULT_SESSION_ID = "default-session" |
| |
|
| |
|
| | class QueryExecutor: |
| | def __init__(self): |
| | self.llm = LLMGateway() |
| | self.data_loader = get_data_loader() |
| | self.geo_engine = get_geo_engine() |
| | self.session_store = get_session_store() |
| | self.semantic_search = get_semantic_search() |
| | self.catalog = get_data_catalog() |
| | self.query_planner = get_query_planner() |
| |
|
| | def _get_schema_context(self) -> str: |
| | """Returns the database schema for the LLM context.""" |
| | return self.data_loader.get_schema_context() |
| |
|
| | async def process_query_with_context(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
| | """ |
| | Orchestrates the full query processing flow with conversation context. |
| | """ |
| | |
| | intent = await self.llm.detect_intent(query, history) |
| | print(f"[GeoQuery] Detected intent: {intent}") |
| | |
| | |
| | if intent == "GENERAL_CHAT": |
| | return await self._handle_general_chat(query, history) |
| | elif intent in ["DATA_QUERY", "MAP_REQUEST"]: |
| | |
| | return await self._handle_data_query(query, history, include_map=True) |
| | elif intent == "SPATIAL_OP": |
| | return await self._handle_spatial_op(query, history) |
| | elif intent == "STAT_QUERY": |
| | return await self._handle_stat_query(query, history) |
| | else: |
| | return await self._handle_general_chat(query, history) |
| |
|
| | async def process_query_stream(self, query: str, history: List[Dict[str, str]]): |
| | """ |
| | Streamable version of process_query_with_context. |
| | Yields: {"event": "status"|"thought"|"chunk"|"result", "data": ...} |
| | """ |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ง Understanding intent..."})} |
| | |
| | intent = "GENERAL_CHAT" |
| | intent_buffer = "" |
| | |
| | try: |
| | async for chunk in self.llm.stream_intent(query, history): |
| | if chunk["type"] == "thought": |
| | yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
| | elif chunk["type"] == "content": |
| | intent_buffer += chunk["text"] |
| | except Exception as e: |
| | print(f"Intent stream error: {e}") |
| | |
| | intent = intent_buffer.strip().upper() |
| | if not intent: |
| | intent = "GENERAL_CHAT" |
| | |
| | |
| | for valid in ["GENERAL_CHAT", "DATA_QUERY", "MAP_REQUEST", "SPATIAL_OP", "STAT_QUERY"]: |
| | if valid in intent: |
| | intent = valid |
| | break |
| | |
| | yield {"event": "intent", "data": json.dumps({"intent": intent})} |
| | print(f"[GeoQuery] Detected intent: {intent}") |
| |
|
| | if intent == "GENERAL_CHAT": |
| | async for chunk in self.llm.generate_response_stream(query, history): |
| | |
| | if chunk.get("type") == "content": |
| | yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk.get("text")})} |
| | elif chunk.get("type") == "thought": |
| | yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk.get("content")})} |
| | |
| | |
| | yield {"event": "result", "data": json.dumps({"response": ""})} |
| | return |
| |
|
| | |
| | |
| | if intent in ["DATA_QUERY", "MAP_REQUEST", "STAT_QUERY"]: |
| | include_map = intent != "STAT_QUERY" |
| | session_id = DEFAULT_SESSION_ID |
| | |
| | |
| | complexity = self.query_planner.detect_complexity(query) |
| | |
| | if complexity["is_complex"]: |
| | yield {"event": "status", "data": json.dumps({"status": "๐ Complex query detected, planning steps..."})} |
| | logger.info(f"Complex query detected: {complexity['reason']}") |
| | |
| | |
| | async for event in self._execute_multi_step_query(query, history, include_map, session_id): |
| | yield event |
| | return |
| | |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ Searching data catalog..."})} |
| | |
| | |
| | candidate_tables = self.semantic_search.search_table_names(query, top_k=15) |
| | |
| | if candidate_tables: |
| | |
| | candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) |
| | else: |
| | |
| | candidate_summaries = self.catalog.get_all_table_summaries() |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ Identifying relevant tables..."})} |
| | relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) |
| | |
| | |
| | if relevant_tables: |
| | yield {"event": "status", "data": json.dumps({"status": f"๐พ Loading tables: {', '.join(relevant_tables)}..."})} |
| | |
| | feature_tables = [] |
| | for table in relevant_tables: |
| | if self.geo_engine.ensure_table_loaded(table): |
| | feature_tables.append(table) |
| | |
| | |
| | table_schema = self.geo_engine.get_table_schemas() |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Writing SQL query..."})} |
| | |
| | sql_buffer = "" |
| | async for chunk in self.llm.stream_analytical_sql(query, table_schema, history): |
| | if chunk["type"] == "thought": |
| | yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
| | elif chunk["type"] == "content": |
| | sql_buffer += chunk["text"] |
| | |
| | sql = sql_buffer.replace("```sql", "").replace("```", "").strip() |
| | |
| | |
| | if "DATA_UNAVAILABLE" in sql or sql.startswith("-- ERROR"): |
| | yield {"event": "status", "data": json.dumps({"status": "โน๏ธ Data not available"})} |
| | |
| | requested = "the requested data" |
| | available = "administrative boundaries (provinces, districts, corregimientos)" |
| | |
| | for line in sql.split("\n"): |
| | if "Requested:" in line: |
| | requested = line.split("Requested:")[-1].strip() |
| | elif "Available:" in line: |
| | available = line.split("Available:")[-1].strip() |
| | |
| | error_response = f"""I couldn't find data for **{requested}** in the current database. |
| | |
| | **Available datasets include:** |
| | - {available} |
| | |
| | If you need additional data, please let me know and I can help you understand what's currently available or suggest alternative queries.""" |
| | |
| | yield { |
| | "event": "result", |
| | "data": json.dumps({ |
| | "response": error_response, |
| | "sql_query": sql, |
| | "geojson": None, |
| | "data_citations": [], |
| | "chart_data": None, |
| | "raw_data": [] |
| | }) |
| | } |
| | return |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "โก Executing query..."})} |
| | |
| | geojson = None |
| | features = [] |
| | error_message = None |
| | |
| | try: |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | yield {"event": "status", "data": json.dumps({"status": f"โ
Found {len(features)} results"})} |
| | except Exception as e: |
| | error_message = str(e) |
| | yield {"event": "status", "data": json.dumps({"status": "โ ๏ธ Query error, attempting repair..."})} |
| | try: |
| | sql = await self.llm.correct_sql(query, sql, error_message, str(table_schema)) |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | error_message = None |
| | except Exception as e2: |
| | print(f"Repair failed: {e2}") |
| |
|
| | if error_message: |
| | yield { |
| | "event": "result", |
| | "data": json.dumps({ |
| | "response": f"I was unable to process your request because the data query failed. \n\nError details: {error_message}", |
| | "sql_query": sql, |
| | "geojson": None, |
| | "data_citations": [], |
| | "chart_data": None, |
| | "raw_data": [] |
| | }) |
| | } |
| | return |
| |
|
| | |
| | citations = ResponseFormatter.generate_citations(relevant_tables, features) |
| | |
| | |
| | chart_data = ResponseFormatter.generate_chart_data(sql, features) |
| | if intent == "STAT_QUERY" and not chart_data and features: |
| | chart_data = ResponseFormatter.generate_chart_data("GROUP BY forced", features) |
| |
|
| | |
| | raw_data = ResponseFormatter.prepare_raw_data(features) |
| | |
| | |
| | if include_map and features and geojson: |
| | |
| | layer_info = await self.llm.generate_layer_name(query, sql) |
| | layer_name_ai = layer_info.get("name", "Map Layer") |
| | layer_emoji = layer_info.get("emoji", "๐") |
| | point_style = layer_info.get("pointStyle", None) |
| | geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
| | |
| | try: |
| | table_name = self.geo_engine.register_layer(layer_id, geojson) |
| | self.session_store.add_layer(session_id, { |
| | "id": layer_id, |
| | "name": layer_name, |
| | "table_name": table_name, |
| | "timestamp": datetime.datetime.now().isoformat() |
| | }) |
| | except Exception as e: |
| | logger.warning(f"Failed to register layer: {e}") |
| |
|
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ฌ Generating explanation..."})} |
| | |
| | data_summary = ResponseFormatter.generate_data_summary(features) |
| |
|
| | explanation_buffer = "" |
| | |
| | async for chunk in self.llm.stream_explanation(query, sql, data_summary, history): |
| | if chunk["type"] == "thought": |
| | yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
| | elif chunk["type"] == "content": |
| | explanation_buffer += chunk["text"] |
| | yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} |
| | |
| | |
| | yield {"event": "result", "data": json.dumps({ |
| | "response": explanation_buffer, |
| | "sql_query": sql, |
| | "geojson": geojson if include_map and features else None, |
| | "chart_data": chart_data, |
| | "raw_data": raw_data, |
| | "data_citations": citations |
| | })} |
| | |
| | elif intent == "SPATIAL_OP": |
| | yield {"event": "status", "data": json.dumps({"status": "๐ Preparing spatial operation..."})} |
| | session_id = DEFAULT_SESSION_ID |
| | |
| | |
| | candidate_tables = self.semantic_search.search_table_names(query, top_k=15) |
| | if candidate_tables: |
| | candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) |
| | else: |
| | candidate_summaries = self.catalog.get_all_table_summaries() |
| | |
| | |
| | relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) |
| | |
| | |
| | for table in relevant_tables: |
| | self.geo_engine.ensure_table_loaded(table) |
| | |
| | |
| | base_table_schema = self.geo_engine.get_table_schemas() |
| | |
| | |
| | session_layers = self.session_store.get_layers(session_id) |
| | layer_context = "User-Created Layers:\n" |
| | if not session_layers: |
| | layer_context += "(No user layers created yet.)\n" |
| | else: |
| | for i, layer in enumerate(session_layers): |
| | layer_context += f"Layer {i+1}: {layer['name']} (Table: {layer['table_name']})\n" |
| | |
| | |
| | full_context = f"{base_table_schema}\n\n{layer_context}" |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Writing spatial SQL..."})} |
| | sql = await self.llm.generate_spatial_sql(query, full_context, history) |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Processing geometry..."})} |
| | error_message = None |
| | geojson = None |
| | features = [] |
| | |
| | try: |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | yield {"event": "status", "data": json.dumps({"status": f"โ
Result contains {len(features)} features"})} |
| | except Exception as e: |
| | error_message = str(e) |
| | yield {"event": "status", "data": json.dumps({"status": "โ ๏ธ Spatial error, attempting repair..."})} |
| | try: |
| | sql = await self.llm.correct_sql(query, sql, error_message, full_context) |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | error_message = None |
| | except Exception as e2: |
| | yield { |
| | "event": "result", |
| | "data": json.dumps({ |
| | "response": f"I tried to perform the spatial operation but encountered an error: {str(e)}\n\nQuery: {sql}", |
| | "sql_query": sql, |
| | "geojson": None, |
| | "data_citations": [], |
| | "chart_data": None, |
| | "raw_data": [] |
| | }) |
| | } |
| | return |
| |
|
| | |
| | if features: |
| | |
| | layer_info = await self.llm.generate_layer_name(query, sql) |
| | layer_name_ai = layer_info.get("name", "Map Layer") |
| | layer_emoji = layer_info.get("emoji", "๐") |
| | point_style = layer_info.get("pointStyle", None) |
| | geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
| | |
| | try: |
| | table_name = self.geo_engine.register_layer(layer_id, geojson) |
| | self.session_store.add_layer(session_id, { |
| | "id": layer_id, |
| | "name": layer_name, |
| | "table_name": table_name, |
| | "timestamp": datetime.datetime.now().isoformat() |
| | }) |
| | except Exception as e: |
| | logger.warning(f"Failed to register layer: {e}") |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ฌ Explaining results..."})} |
| | data_summary = f"Spatial operation resulted in {len(features)} features." |
| | |
| | explanation_buffer = "" |
| | async for chunk in self.llm.stream_explanation(query, sql, data_summary, history): |
| | if chunk["type"] == "thought": |
| | yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
| | elif chunk["type"] == "content": |
| | explanation_buffer += chunk["text"] |
| | yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} |
| | |
| | |
| | yield {"event": "result", "data": json.dumps({ |
| | "response": explanation_buffer, |
| | "sql_query": sql, |
| | "geojson": geojson, |
| | "chart_data": None, |
| | "raw_data": [], |
| | "data_citations": [] |
| | })} |
| | return |
| |
|
| | else: |
| | |
| | yield {"event": "chunk", "data": json.dumps({"type": "text", "content": "I'm not sure how to handle this query yet."})} |
| |
|
| | async def _handle_general_chat(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
| | """Handles general conversational queries.""" |
| | |
| | enhanced_query = f"""The user is asking about Panama geographic data. |
| | |
| | Available data: {len(self.data_loader.admin1)} provinces, {len(self.data_loader.admin2)} districts, {len(self.data_loader.admin3)} corregimientos. |
| | |
| | User question: {query} |
| | |
| | Respond helpfully as GeoQuery, the territorial intelligence assistant.""" |
| |
|
| | response = await self.llm.generate_response(enhanced_query, history) |
| | |
| | return { |
| | "response": response, |
| | "sql_query": None, |
| | "geojson": None, |
| | "data_citations": [], |
| | "intent": "GENERAL_CHAT" |
| | } |
| |
|
| | async def _handle_data_query(self, query: str, history: List[Dict[str, str]], include_map: bool = True) -> Dict[str, Any]: |
| | """ |
| | Handles data queries using text-to-SQL with SOTA Smart Discovery. |
| | """ |
| | print(f"[GeoQuery] Starting Data Query: {query}") |
| | |
| | |
| | from backend.core.data_catalog import get_data_catalog |
| | catalog = get_data_catalog() |
| | |
| | |
| | summaries = catalog.get_all_table_summaries() |
| | |
| | |
| | relevant_tables = await self.llm.identify_relevant_tables(query, summaries) |
| | |
| | |
| | feature_tables = [] |
| | for table in relevant_tables: |
| | if self.geo_engine.ensure_table_loaded(table): |
| | feature_tables.append(table) |
| | else: |
| | print(f"[GeoQuery] Warning: Could not load relevant table '{table}'") |
| | |
| | |
| | table_schema = self.geo_engine.get_table_schemas() |
| |
|
| | |
| | if len(table_schema) < 50: |
| | print("[GeoQuery] GeoEngine schema empty. Fetching from Catalog Metadata.") |
| | fallback_tables = list(set(feature_tables + ["pan_admin1", "pan_admin2", "pan_admin3"])) |
| | table_schema = catalog.get_specific_table_schemas(fallback_tables) |
| | |
| | |
| | print(f"[GeoQuery] Generating SQL with context size: {len(table_schema)} chars") |
| | sql = await self.llm.generate_analytical_sql(query, table_schema, history) |
| | |
| | |
| | if sql.startswith("-- Error"): |
| | available_data = ", ".join(feature_tables) if feature_tables else "Administrative Boundaries" |
| | return { |
| | "response": f"I couldn't find the specific data you asked for. I have access to: {available_data}. \n\nOriginal request: {query}", |
| | "sql_query": sql, |
| | "intent": "DATA_QUERY" |
| | } |
| | |
| | |
| | error_message = None |
| | try: |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | print(f"[GeoQuery] Query returned {len(features)} features") |
| | except Exception as e: |
| | error_message = str(e) |
| | print(f"[GeoQuery] SQL execution error: {error_message}") |
| | |
| | |
| | try: |
| | sql = await self.llm.correct_sql(query, sql, error_message, str(table_schema)) |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | error_message = None |
| | except Exception as e2: |
| | return { |
| | "response": f"The SQL query failed to execute even after an automatic repair attempt.\nOriginal Error: {error_message}\nRepair Error: {str(e2)}", |
| | "sql_query": sql, |
| | "intent": "DATA_QUERY" |
| | } |
| | |
| | |
| | citations = ResponseFormatter.generate_citations(relevant_tables, features) |
| | data_summary = ResponseFormatter.generate_data_summary(features) |
| | |
| | |
| | explanation = await self.llm.generate_explanation(query, sql, data_summary, history) |
| | |
| | |
| | if include_map and features: |
| | |
| | layer_info = await self.llm.generate_layer_name(query, sql) |
| | layer_name_ai = layer_info.get("name", "Map Layer") |
| | layer_emoji = layer_info.get("emoji", "๐") |
| | point_style = layer_info.get("pointStyle", None) |
| | geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
| | |
| | try: |
| | table_name = self.geo_engine.register_layer(layer_id, geojson) |
| | self.session_store.add_layer(DEFAULT_SESSION_ID, { |
| | "id": layer_id, |
| | "name": layer_name, |
| | "table_name": table_name, |
| | "timestamp": datetime.datetime.now().isoformat() |
| | }) |
| | except Exception as e: |
| | logger.warning(f"Failed to register layer in GeoEngine: {e}") |
| |
|
| | |
| | chart_data = ResponseFormatter.generate_chart_data(sql, features) |
| |
|
| | |
| | raw_data = ResponseFormatter.prepare_raw_data(features) |
| |
|
| | return { |
| | "response": explanation, |
| | "sql_query": sql, |
| | "geojson": geojson if include_map and features else None, |
| | "data_citations": citations, |
| | "chart_data": chart_data, |
| | "raw_data": raw_data, |
| | "intent": "DATA_QUERY" if not include_map else "MAP_REQUEST" |
| | } |
| |
|
| | async def _handle_spatial_op(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
| | """Handles spatial operations (Difference, Intersection, etc) using GeoEngine.""" |
| | |
| | from backend.core.data_catalog import get_data_catalog |
| | catalog = get_data_catalog() |
| | summaries = catalog.get_all_table_summaries() |
| | |
| | |
| | relevant_tables = await self.llm.identify_relevant_tables(query, summaries) |
| | |
| | |
| | for table in relevant_tables: |
| | self.geo_engine.ensure_table_loaded(table) |
| | |
| | |
| | base_table_schema = self.geo_engine.get_table_schemas() |
| | |
| | |
| | session_layers = self.session_store.get_layers(DEFAULT_SESSION_ID) |
| | layer_context = "User-Created Layers:\n" |
| | if not session_layers: |
| | layer_context += "(No user layers created yet.)\n" |
| | else: |
| | for i, layer in enumerate(session_layers): |
| | layer_context += f"Layer {i+1}: {layer['name']} (Table: {layer['table_name']})\n" |
| | |
| | |
| | full_context = f"{base_table_schema}\n\n{layer_context}" |
| | |
| | |
| | sql = await self.llm.generate_spatial_sql(query, full_context, history) |
| | |
| | |
| | error_message = None |
| | geojson = None |
| | features = [] |
| | |
| | try: |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | except Exception as e: |
| | error_message = str(e) |
| | try: |
| | sql = await self.llm.correct_sql(query, sql, error_message, full_context) |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | error_message = None |
| | except Exception as e2: |
| | return { |
| | "response": f"I tried to perform the spatial operation but encountered an error: {str(e)}\n\nQuery: {sql}", |
| | "sql_query": sql, |
| | "intent": "SPATIAL_OP" |
| | } |
| |
|
| | |
| | if features: |
| | |
| | layer_info = await self.llm.generate_layer_name(query, sql) |
| | layer_name_ai = layer_info.get("name", "Map Layer") |
| | layer_emoji = layer_info.get("emoji", "๐") |
| | point_style = layer_info.get("pointStyle", None) |
| | geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
| | table_name = self.geo_engine.register_layer(layer_id, geojson) |
| | self.session_store.add_layer(DEFAULT_SESSION_ID, { |
| | "id": layer_id, |
| | "name": layer_name, |
| | "table_name": table_name, |
| | "timestamp": datetime.datetime.now().isoformat() |
| | }) |
| | |
| | data_summary = f"Spatial operation resulted in {len(features)} features." |
| | explanation = await self.llm.generate_explanation(query, sql, data_summary, history) |
| | |
| | return { |
| | "response": explanation, |
| | "sql_query": sql, |
| | "geojson": geojson, |
| | "data_citations": [], |
| | "intent": "SPATIAL_OP" |
| | } |
| |
|
| | async def _handle_stat_query(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
| | """ |
| | Handles statistical queries where charts/tables are more important than maps. |
| | """ |
| | |
| | result = await self._handle_data_query(query, history, include_map=False) |
| | result["intent"] = "STAT_QUERY" |
| | |
| | |
| | if not result.get("chart_data") and result.get("raw_data"): |
| | |
| | features_mock = [{"properties": d} for d in result["raw_data"]] |
| | result["chart_data"] = ResponseFormatter.generate_chart_data(result.get("sql_query", ""), features_mock) |
| | |
| | return result |
| |
|
| | async def _execute_multi_step_query( |
| | self, |
| | query: str, |
| | history: List[Dict[str, str]], |
| | include_map: bool, |
| | session_id: str |
| | ): |
| | """ |
| | Execute a complex query by breaking it into multiple steps. |
| | |
| | Yields streaming events throughout the multi-step process. |
| | """ |
| | import asyncio |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ Discovering relevant datasets..."})} |
| | |
| | candidate_tables = self.semantic_search.search_table_names(query, top_k=20) |
| | if not candidate_tables: |
| | candidate_tables = list(self.catalog.catalog.keys()) |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ Creating execution plan..."})} |
| | |
| | plan = await self.query_planner.plan_query(query, candidate_tables, self.llm) |
| | |
| | if not plan.is_complex or not plan.steps: |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ Executing as simple query..."})} |
| | |
| | candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) |
| | relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) |
| | |
| | for table in relevant_tables: |
| | self.geo_engine.ensure_table_loaded(table) |
| | |
| | table_schema = self.geo_engine.get_table_schemas() |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Writing SQL query..."})} |
| | sql = await self.llm.generate_analytical_sql(query, table_schema, history) |
| | sql = sql.replace("```sql", "").replace("```", "").strip() |
| | |
| | try: |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | except Exception as e: |
| | yield {"event": "result", "data": json.dumps({ |
| | "response": f"Query execution failed: {str(e)}", |
| | "sql_query": sql |
| | })} |
| | return |
| | |
| | data_summary = ResponseFormatter.generate_data_summary(features) |
| | explanation = await self.llm.generate_explanation(query, sql, data_summary, history) |
| | |
| | yield {"event": "result", "data": json.dumps({ |
| | "response": explanation, |
| | "sql_query": sql, |
| | "geojson": geojson if include_map and features else None, |
| | "chart_data": ResponseFormatter.generate_chart_data(sql, features), |
| | "raw_data": ResponseFormatter.prepare_raw_data(features), |
| | "data_citations": [] |
| | })} |
| | return |
| | |
| | |
| | step_descriptions = [f"Step {i+1}: {s.description}" for i, s in enumerate(plan.steps)] |
| | yield {"event": "chunk", "data": json.dumps({ |
| | "type": "thought", |
| | "content": f"Planning multi-step execution:\n" + "\n".join(step_descriptions) |
| | })} |
| | |
| | |
| | all_tables = set() |
| | for step in plan.steps: |
| | all_tables.update(step.tables_needed) |
| | |
| | if all_tables: |
| | yield {"event": "status", "data": json.dumps({"status": f"๐พ Loading {len(all_tables)} datasets..."})} |
| | for table in all_tables: |
| | self.geo_engine.ensure_table_loaded(table) |
| | |
| | |
| | intermediate_results = {} |
| | all_features = [] |
| | all_sql = [] |
| | |
| | for group_idx, group in enumerate(plan.parallel_groups): |
| | group_steps = [s for s in plan.steps if s.step_id in group] |
| | |
| | yield {"event": "status", "data": json.dumps({ |
| | "status": f"โก Executing step group {group_idx + 1}/{len(plan.parallel_groups)}..." |
| | })} |
| | |
| | |
| | for step in group_steps: |
| | yield {"event": "status", "data": json.dumps({ |
| | "status": f"๐ {step.description}..." |
| | })} |
| | |
| | |
| | table_schema = self.geo_engine.get_table_schemas() |
| | |
| | |
| | step_query = f"""Execute this step: {step.description} |
| | |
| | Original user request: {query} |
| | |
| | SQL Hint: {step.sql_template or 'None'} |
| | |
| | Previous step results available: {list(intermediate_results.keys())}""" |
| | |
| | sql = await self.llm.generate_analytical_sql(step_query, table_schema, history) |
| | sql = sql.replace("```sql", "").replace("```", "").strip() |
| | |
| | |
| | if "DATA_UNAVAILABLE" in sql or sql.startswith("-- ERROR"): |
| | logger.warning(f"Step {step.step_id} indicated data unavailable") |
| | intermediate_results[step.result_name] = {"features": [], "sql": sql} |
| | continue |
| | |
| | try: |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | |
| | intermediate_results[step.result_name] = { |
| | "features": features, |
| | "sql": sql, |
| | "geojson": geojson |
| | } |
| | all_features.extend(features) |
| | all_sql.append(f"-- {step.description}\n{sql}") |
| | |
| | yield {"event": "status", "data": json.dumps({ |
| | "status": f"โ
Step got {len(features)} results" |
| | })} |
| | |
| | except Exception as e: |
| | logger.error(f"Step {step.step_id} failed: {e}") |
| | |
| | try: |
| | sql = await self.llm.correct_sql(step_query, sql, str(e), table_schema) |
| | geojson = self.geo_engine.execute_spatial_query(sql) |
| | features = geojson.get("features", []) |
| | intermediate_results[step.result_name] = { |
| | "features": features, |
| | "sql": sql, |
| | "geojson": geojson |
| | } |
| | all_features.extend(features) |
| | all_sql.append(f"-- {step.description} (repaired)\n{sql}") |
| | except Exception as e2: |
| | logger.error(f"Step repair also failed: {e2}") |
| | intermediate_results[step.result_name] = {"features": [], "sql": sql, "error": str(e2)} |
| | |
| | |
| | yield {"event": "status", "data": json.dumps({"status": "๐ฌ Generating combined analysis..."})} |
| | |
| | |
| | result_summary = [] |
| | for name, result in intermediate_results.items(): |
| | features = result.get("features", []) |
| | result_summary.append(f"{name}: {len(features)} records") |
| | |
| | combined_summary = f"""Multi-step query completed with {len(plan.steps)} steps. |
| | |
| | Results: |
| | {chr(10).join(result_summary)} |
| | |
| | Combination logic: {plan.final_combination_logic}""" |
| | |
| | |
| | explanation_buffer = "" |
| | async for chunk in self.llm.stream_explanation(query, "\n\n".join(all_sql), combined_summary, history): |
| | if chunk["type"] == "content": |
| | explanation_buffer += chunk["text"] |
| | yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} |
| | |
| | |
| | best_geojson = None |
| | best_features = [] |
| | for name, result in intermediate_results.items(): |
| | features = result.get("features", []) |
| | if len(features) > len(best_features): |
| | best_features = features |
| | best_geojson = result.get("geojson") |
| | |
| | |
| | if include_map and best_features and best_geojson: |
| | layer_info = await self.llm.generate_layer_name(query, all_sql[0] if all_sql else "") |
| | layer_name_ai = layer_info.get("name", "Multi-Step Result") |
| | layer_emoji = layer_info.get("emoji", "๐") |
| | best_geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer( |
| | query, best_geojson, best_features, layer_name_ai, layer_emoji |
| | ) |
| | |
| | try: |
| | table_name = self.geo_engine.register_layer(layer_id, best_geojson) |
| | self.session_store.add_layer(session_id, { |
| | "id": layer_id, |
| | "name": layer_name, |
| | "table_name": table_name, |
| | "timestamp": datetime.datetime.now().isoformat() |
| | }) |
| | except Exception as e: |
| | logger.warning(f"Failed to register multi-step layer: {e}") |
| | |
| | |
| | chart_data = ResponseFormatter.generate_chart_data("\n".join(all_sql), best_features) |
| | raw_data = ResponseFormatter.prepare_raw_data(best_features) |
| | |
| | |
| | yield {"event": "result", "data": json.dumps({ |
| | "response": explanation_buffer, |
| | "sql_query": "\n\n".join(all_sql), |
| | "geojson": best_geojson if include_map and best_features else None, |
| | "chart_data": chart_data, |
| | "raw_data": raw_data, |
| | "data_citations": [], |
| | "multi_step": True, |
| | "steps_executed": len(plan.steps) |
| | })} |
| |
|