add stateful auto explore

This commit is contained in:
abnerhexu
2026-01-24 15:45:57 +08:00
parent 9b05507c5e
commit 9cb4537528
2 changed files with 504 additions and 1 deletions

View File

@@ -319,6 +319,10 @@ class UAVAPIClient:
circle = self._request('GET', '/targets/type/circle')
polygen = self._request('GET', '/targets/type/polygon')
return fixed + moving + waypoint + circle + polygen
def get_target_status(self, target_id: str) -> Dict[str, Any]:
"""Get information about a specific target"""
return self._request('GET', f'/targets/{target_id}')
def get_waypoints(self) -> List[Dict[str, Any]]:
"""Get all charging station waypoints"""

View File

@@ -7,6 +7,11 @@ from langchain.tools import tool
from uav_api_client import UAVAPIClient
import json
class ToolStates:
def __init__(self):
self.explored_count = 0
tool_states = ToolStates()
def create_uav_tools(client: UAVAPIClient) -> list:
"""
@@ -952,7 +957,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
return f"Error executing path finding: {str(e)}\nPlease try finding a path dynamically."
@tool
def auto_navigate_to(input_json: str) -> str:
def auto_navigate_to3(input_json: str) -> str:
"""
Plan an obstacle-avoiding path to the target position (x, y, z), automatically determining whether to detour or overfly.
@@ -1255,6 +1260,499 @@ def create_uav_tools(client: UAVAPIClient) -> list:
except Exception as e:
return f"Error executing path finding: {str(e)}"
@tool
def auto_navigate_to(input_json: str) -> str:
"""
Plan an obstacle-avoiding path using analytic geometry for precise collision detection.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- x: Target X coordinate in meters (required)
- y: Target Y coordinate in meters (required)
- z: Target Z coordinate (altitude) in meters (required)
Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
"""
import math
import heapq
import json
# --- 内部几何算法类 ---
class GeometryUtils:
@staticmethod
def point_to_segment_dist_sq(p, a, b):
"""
计算点 p 到线段 ab 的最短距离的平方 (避免开方,提高性能)。
返回: 距离的平方
"""
px, py = p[0], p[1]
ax, ay = a[0], a[1]
bx, by = b[0], b[1]
l2 = (ax - bx)**2 + (ay - by)**2
if l2 == 0: return (px - ax)**2 + (py - ay)**2
t = ((px - ax) * (bx - ax) + (py - ay) * (by - ay)) / l2
t = max(0, min(1, t))
proj_x = ax + t * (bx - ax)
proj_y = ay + t * (by - ay)
return (px - proj_x)**2 + (py - proj_y)**2
@staticmethod
def segments_intersect(a1, a2, b1, b2):
"""判断线段 a1-a2 与 b1-b2 是否相交"""
def ccw(A, B, C):
return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0])
return ccw(a1, b1, b2) != ccw(a2, b1, b2) and ccw(a1, a2, b1) != ccw(a1, a2, b2)
@staticmethod
def is_point_in_polygon(p, vertices):
x, y = p[0], p[1]
inside = False
j = len(vertices) - 1
for i in range(len(vertices)):
xi, yi = vertices[i]['x'], vertices[i]['y']
xj, yj = vertices[j]['x'], vertices[j]['y']
intersect = ((yi > y) != (yj > y)) and \
(x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi)
if intersect:
inside = not inside
j = i
return inside
@staticmethod
def check_collision(p1, p2, obs, safety_buffer=2.0):
"""
检测线段 p1-p2 是否与障碍物 obs 碰撞。
返回: (Boolean 是否碰撞, Float 障碍物高度)
"""
otype = obs['type']
opos = obs['position']
ox, oy = opos['x'], opos['y']
obs_height = obs.get('height', 0)
# 1. 圆形/点
if otype in ['circle', 'point']:
r = obs.get('radius', 0)
limit = r + safety_buffer
# 使用平方距离比较,避免 sqrt
dist_sq = GeometryUtils.point_to_segment_dist_sq((ox, oy), p1, p2)
if dist_sq < limit**2:
return True, obs_height
# 2. 椭圆 (使用坐标变换法 - Analytic Solution)
elif otype == 'ellipse':
w = obs.get('width', 0)
l = obs.get('length', 0)
# 为了处理 Buffer我们将 Buffer 加到轴长上
# 这是一种工程近似:虽然真正的 Offset Curve 不是椭圆,
# 但 (w+buffer, l+buffer) 的椭圆是其包围盒,且计算非常快。
semi_axis_x = w + safety_buffer
semi_axis_y = l + safety_buffer
# 坐标变换:将世界坐标系的点变换到“单位圆空间”
# 1. 平移:以椭圆中心为原点
# 2. 缩放X 除以长轴Y 除以短轴
def to_unit_space(point):
dx = point[0] - ox
dy = point[1] - oy
return (dx / semi_axis_x, dy / semi_axis_y)
u_p1 = to_unit_space(p1)
u_p2 = to_unit_space(p2)
# 现在问题变成了:线段 u_p1 -> u_p2 是否与 单位圆 (Radius=1) 相交
# 即:原点 (0,0) 到线段的距离是否 < 1.0
dist_sq_in_unit_space = GeometryUtils.point_to_segment_dist_sq((0,0), u_p1, u_p2)
if dist_sq_in_unit_space < 1.0: # 1.0^2 = 1.0
return True, obs_height
# 3. 多边形
elif otype == 'polygon':
verts = obs['vertices']
if not verts: return False, 0
for i in range(len(verts)):
v1 = (verts[i]['x'], verts[i]['y'])
v2 = (verts[(i + 1) % len(verts)]['x'], verts[(i + 1) % len(verts)]['y'])
if GeometryUtils.segments_intersect(p1, p2, v1, v2):
return True, obs_height
if GeometryUtils.is_point_in_polygon(p1, verts) or GeometryUtils.is_point_in_polygon(p2, verts):
return True, obs_height
return False, 0
# ---------------------------------------------------------
try:
# 1. 解析参数
params = json.loads(input_json) if isinstance(input_json, str) else input_json
drone_id = params.get('drone_id')
tx, ty, tz = params.get('x'), params.get('y'), params.get('z')
if not drone_id or tx is None or ty is None or tz is None:
return "Error: drone_id, x, y, z are required"
# 2. 获取状态
status = client.get_drone_status(drone_id)
start_pos = status['position']
sx, sy, sz = start_pos['x'], start_pos['y'], start_pos['z']
drone_max_alt = status.get('max_altitude', 100.0)
all_obstacles = client.get_obstacles()
# 3. 分类
mandatory_avoid = []
fly_over_candidates = []
for obs in all_obstacles:
h = obs.get('height', 0)
if h == 0 or h >= drone_max_alt:
mandatory_avoid.append(obs)
else:
fly_over_candidates.append(obs)
target_point = (tx, ty)
start_point = (sx, sy)
# 4. 2D 路径规划
# 4.1 生成节点
nodes = [start_point, target_point]
safety_margin = 5.0
for obs in mandatory_avoid:
opos = obs['position']
ox, oy = opos['x'], opos['y']
if obs['type'] == 'polygon':
# 多边形节点策略 (保持原样)
center_x = sum(v['x'] for v in obs['vertices']) / len(obs['vertices'])
center_y = sum(v['y'] for v in obs['vertices']) / len(obs['vertices'])
for v in obs['vertices']:
vx, vy = v['x'], v['y']
vec_len = math.hypot(vx - center_x, vy - center_y)
if vec_len > 0:
scale = (vec_len + safety_margin) / vec_len
nx = center_x + (vx - center_x) * scale
ny = center_y + (vy - center_y) * scale
nodes.append((nx, ny))
elif obs['type'] == 'ellipse':
# 椭圆节点生成:依然使用参数方程,因为要生成具体的绕行点坐标
w = obs.get('width', 0)
l = obs.get('length', 0)
gen_w = w + safety_margin
gen_l = l + safety_margin
num_steps = max(8, int(max(w, l) / 3.0))
for i in range(num_steps):
angle = i * (2 * math.pi / num_steps)
nx = ox + gen_w * math.cos(angle)
ny = oy + gen_l * math.sin(angle)
nodes.append((nx, ny))
elif obs['type'] in ['circle', 'point']:
r = obs.get('radius', 0)
gen_r = r + safety_margin
num_steps = max(8, int(r / 3.0))
for i in range(num_steps):
angle = i * (2 * math.pi / num_steps)
nodes.append((ox + gen_r * math.cos(angle), oy + gen_r * math.sin(angle)))
# 4.2 过滤非法节点
valid_nodes = []
for node in nodes:
is_bad = False
for obs in mandatory_avoid:
# 使用更新后的 check_collision (数学解) 进行点包含测试
# 线段长度为0即为点
collided, _ = GeometryUtils.check_collision(node, node, obs, safety_buffer=0.5)
if collided:
is_bad = True
break
if not is_bad:
valid_nodes.append(node)
if start_point not in valid_nodes: valid_nodes.insert(0, start_point)
if target_point not in valid_nodes: valid_nodes.append(target_point)
start_idx = valid_nodes.index(start_point)
target_idx = valid_nodes.index(target_point)
# 4.3 构建图
adj = {i: [] for i in range(len(valid_nodes))}
for i in range(len(valid_nodes)):
for j in range(i + 1, len(valid_nodes)):
u, v = valid_nodes[i], valid_nodes[j]
path_blocked = False
for obs in mandatory_avoid:
# 使用更新后的 check_collision (数学解) 进行线段检测
hit, _ = GeometryUtils.check_collision(u, v, obs, safety_buffer=2.0)
if hit:
path_blocked = True
break
if not path_blocked:
dist = math.hypot(u[0]-v[0], u[1]-v[1])
adj[i].append((j, dist))
adj[j].append((i, dist))
# 4.4 Dijkstra
pq = [(0.0, start_idx, [valid_nodes[start_idx]])]
visited = set()
path_2d = []
while pq:
cost, u, path = heapq.heappop(pq)
if u == target_idx:
path_2d = path
break
if u in visited: continue
visited.add(u)
for v_idx, w in adj[u]:
if v_idx not in visited:
heapq.heappush(pq, (cost + w, v_idx, path + [valid_nodes[v_idx]]))
if not path_2d:
return json.dumps({"status": "Failed: No 2D path found."})
# 5. 高度计算
safe_base_alt = max(sz, tz)
path_max_obs_height = 0.0
for i in range(len(path_2d) - 1):
p1 = path_2d[i]
p2 = path_2d[i+1]
for obs in fly_over_candidates:
hit, obs_h = GeometryUtils.check_collision(p1, p2, obs, safety_buffer=2.0)
if hit:
path_max_obs_height = max(path_max_obs_height, obs_h)
if path_max_obs_height > 0:
cruise_alt = path_max_obs_height + 2.0
else:
cruise_alt = safe_base_alt
cruise_alt = max(cruise_alt, sz, tz)
if cruise_alt > drone_max_alt:
return json.dumps({"status": "Failed: Required altitude exceeds drone capability."})
# 6. 生成航点
waypoints = []
waypoints.append((sx, sy, sz))
if cruise_alt > sz + 0.5:
waypoints.append((sx, sy, cruise_alt))
for i in range(len(path_2d)):
node = path_2d[i]
if i == 0 and math.hypot(node[0]-sx, node[1]-sy) < 0.1:
continue
waypoints.append((node[0], node[1], cruise_alt))
last_wp = waypoints[-1]
if abs(last_wp[2] - tz) > 0.5 or math.hypot(last_wp[0]-tx, last_wp[1]-ty) > 0.1:
waypoints.append((tx, ty, tz))
# 7. 执行
final_msg = "Success"
for wp in waypoints:
if wp == waypoints[0] and len(waypoints) > 1:
continue
waypoint_move_result = client.move_to(drone_id, wp[0], wp[1], wp[2])
if waypoint_move_result["status"] == "error":
return f"Error moving to waypoint {wp}: {waypoint_move_result['message']}"
final_msg = waypoint_move_result.get("message", "Success")
return json.dumps({"status": "success", "path": waypoints, "message": final_msg})
except Exception as e:
return f"Error executing path finding: {str(e)}"
@tool
def auto_explore(input_json: str) -> str:
"""
Automatically move the drone to achieve a specified coverage ratio with respect to a given target.
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- target_id: The ID of the target (required)
- coverage: Target coverage ratio (required)
Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
"""
import json
import math
# ================= 1. 内部几何判定函数 =================
def check_drone_in_target(drone_id: str, target_id: str):
# 获取数据
drone = client.get_drone_status(drone_id)
target = client.get_target_status(target_id)
# 提取无人机坐标
d_pos = drone['position']
d_x, d_y, d_z = d_pos['x'], d_pos['y'], d_pos['z']
# 提取目标坐标信息
t_pos = target['position']
t_z = t_pos['z']
# --- 判定条件 1: Z 坐标相同 ---(无需判断)
# if not math.isclose(d_z, t_z, abs_tol=0.1):
# return False, drone, target # 返回数据以便复用
# --- 判定条件 2: X、Y 坐标在平面内 ---
t_type = target['type']
inside = False
# 情况 A: 多边形 (Polygon)
if t_type == 'polygon':
vertices = target.get('vertices', [])
if not vertices:
return False, drone, target
# 射线法
j = len(vertices) - 1
for i in range(len(vertices)):
xi, yi = vertices[i]['x'], vertices[i]['y']
xj, yj = vertices[j]['x'], vertices[j]['y']
intersect = ((yi > d_y) != (yj > d_y)) and \
(d_x < (xj - xi) * (d_y - yi) / (yj - yi + 1e-9) + xi)
if intersect:
inside = not inside
j = i
# 情况 B: 圆形区域
elif t_type in ['circle', 'waypoint', 'fixed']:
t_x, t_y = t_pos['x'], t_pos['y']
radius = target.get('radius', 0.0)
distance = math.sqrt((d_x - t_x)**2 + (d_y - t_y)**2)
inside = distance <= radius
return inside, drone, target
# ================= 2. 辅助函数:点是否在目标内 =================
def is_point_in_target(x, y, target_data):
t_type = target_data['type']
if t_type == 'polygon':
vertices = target_data.get('vertices', [])
inside = False
j = len(vertices) - 1
for i in range(len(vertices)):
xi, yi = vertices[i]['x'], vertices[i]['y']
xj, yj = vertices[j]['x'], vertices[j]['y']
intersect = ((yi > y) != (yj > y)) and \
(x < (xj - xi) * (y - yi) / (yj - yi + 1e-9) + xi)
if intersect:
inside = not inside
j = i
return inside
else: # Circle based
t_x, t_y = target_data['position']['x'], target_data['position']['y']
radius = target_data.get('radius', 0.0)
return math.sqrt((x - t_x)**2 + (y - t_y)**2) <= radius
# ================= 3. 主逻辑 =================
try:
data = json.loads(input_json)
drone_id = data['drone_id']
# 注意:这里我们假设输入包含 target_id因为单纯的 x,y,z 无法描述多边形形状
# 如果必须使用 x,y,z 寻找 target则需要额外的逻辑去匹配 target_id
target_id = data.get('target_id', 'unknown_target')
required_coverage = data.get('coverage', 0.95)
except Exception as e:
return f"Error parsing input: {str(e)}"
# 3.1 初始位置检查
is_inside, drone_data, target_data = check_drone_in_target(drone_id, target_id)
if not is_inside:
return f"Error: Drone {drone_id} is not inside target {target_id}. Please move inside first."
# 3.2 路径规划准备
task_radius = drone_data.get('task_radius', 10.0)
current_z = drone_data['position']['z']
# 计算 Bounding Box (边界框)
if target_data['type'] == 'polygon':
vx = [v['x'] for v in target_data['vertices']]
vy = [v['y'] for v in target_data['vertices']]
min_x, max_x = min(vx), max(vx)
min_y, max_y = min(vy), max(vy)
else:
tx, ty = target_data['position']['x'], target_data['position']['y']
r = target_data['radius']
min_x, max_x = tx - r, tx + r
min_y, max_y = ty - r, ty + r
# 3.3 网格化路径生成 (Grid Decomposition)
# 步长设定:为了保证覆盖率,步长通常设为 task_radius 的 √2 倍或更小
# 这里设为 task_radius 确保每个网格点代表的圆都有重叠,保证无缝隙
step_size = task_radius * 1.414
valid_waypoints = [] # 存储所有位于目标内部的网格点
# 扫描 Bounding Box
x_cursor = min_x
while x_cursor <= max_x:
y_cursor = min_y
col_points = []
while y_cursor <= max_y:
# 只有当网格点在目标几何体内时,才加入路径
if is_point_in_target(x_cursor, y_cursor, target_data):
col_points.append({'x': x_cursor, 'y': y_cursor})
y_cursor += step_size
# 蛇形排序 (Boustrophedon): 偶数列正向,奇数列反向
# 这样可以最小化无人机换列时的飞行距离
if col_points:
# 根据当前的列数决定方向,这里用 valid_waypoints 已有长度估算列是不准的
# 简单做法:利用 x_cursor 的归一化索引,或者直接交替 append
pass # 后续统一处理
valid_waypoints.append(col_points)
x_cursor += step_size
# 展平并执行蛇形排序
final_path = []
for i, col in enumerate(valid_waypoints):
if i % 2 == 1:
final_path.extend(reversed(col))
else:
final_path.extend(col)
total_points = len(final_path)
if total_points == 0:
return "Error: Target area is too small or invalid geometry found."
# 3.4 执行探索
for idx, wp in enumerate(final_path):
if idx < tool_states.explored_count:
continue
# 移动无人机
client.move_to(drone_id, wp['x'], wp['y'], current_z)
tool_states.explored_count += 1
current_coverage = tool_states.explored_count / total_points
# 检查是否达标
if current_coverage >= required_coverage:
tool_states.explored_count = 0
return f"Success: Target explored with coverage {current_coverage:.2%} (Visited {tool_states.explored_count}/{total_points} grid points)"
return f"Finished path. Final coverage: {current_coverage:.2%}"
# drone = client.get_drone_status(drone_id)
# {"id":"c1a5bf4b","name":"Drone 1","model":"Model-A","status":"hovering","position":{"x":613.0,"y":437.0,"z":24.0},"heading":90.0,"speed":0.0,"battery_level":33.48548886660462,"max_speed":16.0,"max_altitude":362.0,"battery_capacity":93.4,"perceived_radius":100.0,"task_radius":10.0,"home_position":{"x":377.0,"y":268.0,"z":0.0},"created_at":1766328129.6861398,"last_updated":1769226225.5813532}
# target = client.get_target_status(target_id)
# {"id":"1122ecf0","name":"Circle Target 1","type":"circle","position":{"x":463.0,"y":312.0,"z":0.0},"description":"","radius":200.0,"created_at":1766328129.676041,"last_updated":1769224819.8737159,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["c1a5bf4b"]}
# {"id":"d6816dd4","name":"Polygon Target 1","type":"polygon","position":{"x":911.0,"y":552.0,"z":0.0},"description":"","radius":1.0,"created_at":1766328129.677602,"last_updated":1769225798.403095,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["c1a5bf4b"],"vertices":[{"x":831.08,"y":471.74,"z":0.0},{"x":991.08,"y":471.74,"z":0.0},{"x":991.08,"y":631.74,"z":0.0},{"x":831.08,"y":631.74,"z":0.0}]}
# {"id":"81f45d4c","name":"BS_01 PLA West","type":"waypoint","position":{"x":1065.0,"y":1200.0,"z":0.0},"description":"","radius":10.0,"created_at":1767770673.5973494,"last_updated":1768203787.0783787,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":30.0,"is_reached":true,"reached_by":["21c6b4d7","3f987e09","b61fcff5"]}
# {"id":"05c89883","name":"Hong Kong International Airport Restricted Zone","type":"fixed","position":{"x":1900.0,"y":200.0,"z":0.0},"description":"No-fly zone due to airport operations. Drones must stay clear.","radius":150.0,"created_at":1767767958.9605737,"last_updated":1768204251.032104,"velocity":null,"moving_path":null,"moving_duration":null,"current_path_index":null,"path_direction":null,"time_in_direction":null,"calculated_speed":null,"charge_amount":null,"is_reached":true,"reached_by":["21c6b4d7","3f987e09","b61fcff5"]}
# return drone['position'] == target['position']
# Return all tools
return [
list_drones,
@@ -1263,6 +1761,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
# get_session_data,
get_task_progress,
get_weather,
auto_explore,
# get_targets,
get_obstacles,
get_nearby_entities,