Spaces:
Sleeping
Sleeping
File size: 7,760 Bytes
4cb2d06 34b56b2 4cb2d06 34b56b2 4cb2d06 34b56b2 4cb2d06 34b56b2 4cb2d06 34b56b2 4cb2d06 34b56b2 4cb2d06 34b56b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
"""Centralized prompt builders for all LLM calls."""
from __future__ import annotations
from typing import Any, Iterable, Mapping
def mission_planner_system_prompt() -> str:
return (
"You are an expert mission-aware perception planner for an intelligent sensing system. "
"Your task is to translate a high-level mission directive and contextual cues into a "
"prioritized set of semantic entities, objects, or events that the perception stack "
"should sense.\n\n"
"Think in terms of mission-relevant semantics rather than detector-specific classes. "
"Objects may represent physical entities, behaviors, configurations, or latent threats. "
"Do not constrain recommendations to any fixed taxonomy.\n\n"
"For each recommendation, reason causally about how sensing this entity reduces mission "
"uncertainty, enables downstream decisions, or serves as an early indicator of mission-relevant events. "
"Favor entities that are information-dense and mission-critical.\n\n"
"If the mission directive is abstract (e.g., tracking, monitoring, movement),"
"you must decompose it into concrete, observable semantic entities or events"
"that a perception system could plausibly sense.\n\n"
"Ground every recommendation in observable perceptual evidence (visual, spatial, or temporal), "
"but do not assume a specific detector architecture. "
"Avoid hallucinations beyond the provided mission and cues."
)
def mission_planner_user_prompt(
mission: str,
top_k: int,
*,
context: Mapping[str, Any] | None = None,
cues: Mapping[str, Any] | None = None,
pipeline_candidates: Iterable[str] | None = None,
coco_catalog: str | None = None,
) -> str:
mission_text = mission.strip() or "N/A"
context_blob = _format_context_blob(context)
cues_blob = _format_cues_blob(cues)
candidate_text = (
", ".join(sorted({candidate.strip() for candidate in pipeline_candidates if candidate.strip()}))
if pipeline_candidates
else "Preselected by mission context."
)
coco_text = coco_catalog.strip() if coco_catalog else "COCO classes unavailable."
return (
f"Mission directive:\n{mission_text}\n\n"
f"Structured context inputs:\n{context_blob}\n\n"
f"Derived location/mission cues:\n{cues_blob}\n\n"
f"Selectable pipeline IDs: {candidate_text}\n\n"
"Downstream detector recognizes ONLY these COCO objects (use exact spelling):\n"
f"{coco_text}\n\n"
"Instructions:\n"
"- Interpret the mission directive and contextual cues to determine what **semantic entities, "
"objects, or events** are most critical for mission success.\n"
"- If the mission is abstract (e.g., tracking, monitoring, movement, surveillance), you MUST "
"decompose it into **concrete, observable entities or events** that a perception system could "
"plausibly sense. Do not leave the mission at an abstract level.\n"
"- Infer mission_type, location_type, time_of_day, and priority_level "
"(bounded to allowed enums). Do NOT modify intel_flag, sensor_type, or compute_mode.\n"
"- If multiple pipeline IDs are listed, select exactly ONE id from that list. "
"If only one ID is given, respect it.\n"
"- Never invent new pipelines or modify existing pipeline definitions.\n"
"- Recommendations are NOT limited to static objects. They may include:\n"
" • dynamic entities (e.g., vehicles, aircraft, people)\n"
" • behaviors or motion patterns\n"
" • spatial configurations or interactions\n"
" • anomalous or unexpected activity\n"
"- Each recommendation MUST correspond to a **specific, real-world, observable phenomenon**.\n"
"- Do NOT use placeholder names such as 'Objective', 'Target', 'Entity', or generic labels.\n"
"- Every entity name MUST be an exact string match to one of the provided COCO classes above. "
"If the mission demands a concept outside this list, decompose it into observable COCO objects.\n"
"- Provide at most "
f"{top_k} "
"recommendations, ordered by priority and expected information gain for the mission.\n"
"- Scores MUST be floats in [0, 1] and represent **relative mission importance**, "
"NOT detection confidence or likelihood.\n"
"- For EACH recommendation, you MUST explain:\n"
" • why it matters to the mission\n"
" • what observable perceptual cues (visual, spatial, or temporal) would indicate its presence\n"
" • how sensing it informs downstream decision-making or reduces mission uncertainty\n\n"
"Return JSON with the following schema ONLY (no extra text):\n"
"{\n"
' "mission": "<possibly refined mission summary>",\n'
' "context": {\n'
' "mission_type": "<one of surveillance|tracking|threat_detection|safety_monitoring>",\n'
' "location_type": "<one of urban|suburban|rural|industrial|coastal|harbor|bridge|roadway|indoor|unknown>",\n'
' "time_of_day": "<day|night|null>",\n'
' "priority_level": "<routine|elevated|high|null>"\n'
" },\n"
' "pipeline": {"id": "<pipeline_id_from_list>", "reason": "<concise justification>"},\n'
' "entities": [\n'
" {\n"
' "name": "<semantic entity or event>",\n'
' "score": <float>,\n'
' "semantic_role": "<primary_target|threat_proxy|contextual_indicator|operational_constraint>",\n'
' "perceptual_cues": "<observable visual/spatial/temporal evidence>",\n'
' "rationale": "<mission-grounded justification>"\n'
" }\n"
" ]\n"
"}"
)
def _format_context_blob(context: Mapping[str, Any] | None) -> str:
if not context:
return "No additional context provided."
lines = []
mission_type = context.get("mission_type")
location_type = context.get("location_type")
time_of_day = context.get("time_of_day")
if mission_type:
lines.append(f"- Mission type: {mission_type}")
if location_type:
lines.append(f"- Location type: {location_type}")
if time_of_day:
lines.append(f"- Time of day: {time_of_day}")
if not lines:
return "Context provided but no structured cues supplied."
return "\n".join(lines)
def _format_cues_blob(cues: Mapping[str, Any] | None) -> str:
if not cues:
return "No derived cues."
lines = []
for key, value in cues.items():
if value is None:
continue
if isinstance(value, (list, tuple)):
serialized = ", ".join(str(item) for item in value if item)
else:
serialized = str(value)
if not serialized:
continue
lines.append(f"- {key.replace('_', ' ').capitalize()}: {serialized}")
if not lines:
return "Derived cues provided but empty."
return "\n".join(lines)
def mission_summarizer_system_prompt() -> str:
return (
"You are a surveillance analyst producing brief situation reports. "
"Review the provided mission context and detections, then respond with a concise summary "
"(at most three short sentences) covering key findings only. "
"If no detections appear, clearly state that fact. Avoid extra commentary or formatting."
)
def mission_summarizer_user_prompt(payload_json: str) -> str:
return (
"Summarize this mission outcome succinctly (<=3 sentences, no bullet points):\n"
f"{payload_json}"
)
|