1. fix auto_explore bug, 增加缓冲区-覆盖的圆形更完美

2. add scan_points list,将unvisited scan_points作为下一次scan_all_env的目标
3. optimizae auto_scan_env, 根据四个方位划分无人机最近的探索区域
This commit is contained in:
莲子心
2026-01-28 12:17:29 +08:00
parent 65ef6ef39b
commit 5c667941be
3 changed files with 217 additions and 54 deletions

View File

@@ -1,5 +1,5 @@
{
"selected_provider": "Kimi",
"selected_provider": "OpenAI",
"provider_configs": {
"Ollama": {
"type": "ollama",
@@ -17,22 +17,21 @@
},
"OpenAI": {
"type": "openai-compatible",
"base_url": "https://api.openai.com/v1",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"models_endpoint": "/models",
"chat_endpoint": "/chat/completions",
"requires_api_key": true,
"api_key": "",
"api_key": "sk-2f228b8c30964dba84e888b565add0a8",
"encoding": "utf-8",
"default_model": "gpt-4o-mini",
"default_model": "deepseek-v3.2",
"default_models": [
"gpt-4o-mini",
"gpt-4o",
"gpt-4.1-mini",
"gpt-3.5-turbo"
"deepseek-v3.2",
"qwen-flash",
"qwen3-30b-a3b-instruct-2507"
],
"allow_endpoint_edit": true,
"allow_api_toggle": true,
"system_prompt": ""
"system_prompt": "You are a very friendly drone control agent. No matter what language I use to give you instructions, please call the tools to perform the task and then reply in Chinese."
},
"Kimi": {
"type": "openai-compatible",

View File

@@ -91,7 +91,10 @@ Tips for you to finish task in the most efficient way:
10. Line formation means after finishing the task the two (or more) drones should move to a same position.
Begin!
11. Before executing a task or moving to a new position, must get the nearby entities first.
12. The max moving distance for a single move_to or navigate_to command is 150 meters. If the distance is longer than that, find a mediam point to go first.
12. The max moving distance for a single move_to or navigate_to command is 300 meters. If the distance is longer than that, find a mediam point to go first.
13. When targets cannot be found, call `auto_scan_all_environment` as early as possible.
14. When `auto_scan_all_environment` is not completed, control multiple active drones move to unvisited scan points , use `get_scan_points` to get the list of scan points categorized by visited status and current drone positions.
15. Executing tasks implicitly performs environment perception, so avoid using the `get_nearby_entities` API as much as possible.
Question: {input}
Thought:{agent_scratchpad}"""

View File

@@ -10,6 +10,76 @@ import heapq
import json
from typing import List, Dict, Any, Optional
# --- 构建地图scan points ---
# 核心参数
r = 150 # 侦察半径(m)
map_width = 1024 # 地图宽度(m)
map_height = 768 # 地图高度(m)
dx = r * math.sqrt(3) # 横向点间距
dy = 1.5 * r # 纵向行间距
# 正六边形网格侦察点坐标列表 (x, y)
drone_points = [
# 第1行奇数行y=150.0
(round(dx/2, 4), 150.0), # (129.9038, 150.0)
(round(dx/2 + dx, 4), 150.0), # (389.7114, 150.0)
(round(dx/2 + 2*dx, 4), 150.0), # (649.5190, 150.0)
(round(dx/2 + 3*dx, 4), 150.0), # (909.3266, 150.0)
# 第2行偶数行y=375.0 (150+225)
(round(dx, 4), 375.0), # (259.8076, 375.0)
(round(dx + dx, 4), 375.0), # (519.6152, 375.0)
(round(dx + 2*dx, 4), 375.0), # (779.4228, 375.0)
(round(dx + 3*dx, 4), 375.0), # (1039.2304, 375.0) # 略超1024覆盖右边缘
# 第3行奇数行y=600.0 (375+225)
(round(dx/2, 4), 600.0), # (129.9038, 600.0)
(round(dx/2 + dx, 4), 600.0), # (389.7114, 600.0)
(round(dx/2 + 2*dx, 4), 600.0), # (649.5190, 600.0)
(round(dx/2 + 3*dx, 4), 600.0), # (909.3266, 600.0)
# 第4行偶数行y=825.0 (600+225) # 略超768覆盖上边缘
(round(dx, 4), 825.0), # (259.8076, 825.0)
(round(dx + dx, 4), 825.0), # (519.6152, 825.0)
(round(dx + 2*dx, 4), 825.0), # (779.4228, 825.0)
(round(dx + 3*dx, 4), 825.0) # (1039.2304, 825.0)
]
class ScanPointInfo:
def __init__(self, x: float, y: float):
self.x: float = x
self.y: float = y
self.visited: bool = False
self.drones_visited: List[str] = []
def to_dict(self):
return {
"x": self.x,
"y": self.y,
"visited": self.visited
}
scan_points = [ScanPointInfo(x, y) for x, y in drone_points]
def scan_point_is_reached(scan_point: ScanPointInfo, drone_pos: tuple[float, float], threshold: float = 30.0) -> bool:
"""判断当前位置在侦察点附近"""
dx = scan_point.x - drone_pos[0]
dy = scan_point.y - drone_pos[1]
dist_sq = dx * dx + dy * dy
return dist_sq <= threshold * threshold
def scan_point_is_reached_by_drones(drone_id: str, x: float, y: float, threshold: float = 30.0) -> bool:
"""判断无人机当前位置是否到达侦察点"""
global scan_points
for i in scan_points:
if scan_point_is_reached(i, (x, y), threshold):
i.visited = True
if drone_id not in i.drones_visited:
i.drones_visited.append(drone_id)
print(f"Drone {drone_id} reached scan point ({i.x}, {i.y})")
break
class TargetInfo:
def __init__(self, data: Dict[str, Any]):
self.id: str = data.get("id")
@@ -907,10 +977,10 @@ def create_uav_tools(client: UAVAPIClient) -> list:
})
# return f"Finding a destination successfully (dist={current_distance}m)... Now use `auto_navigate_to({nav_payload})` to move the drone. This tool safely brings drone to the destination and detour obstacles. First try it with exactly input {nav_payload}, if it fails, then adjust the positions."
return auto_navigate_to_non_tool(nav_payload, move_towards=True, move_towards_task=(direction, min_distance))
return auto_navigate_to_non_tool(nav_payload, move_towards_flag=True, move_towards_direction=direction, move_towards_distance=min_distance)
except Exception as e:
return f"Error executing auto_navigate_towards: {str(e)}"
return json.dumps({"status": "error", "message": f"Error executing auto_navigate_towards: {str(e)}"})
@@ -1000,6 +1070,9 @@ def create_uav_tools(client: UAVAPIClient) -> list:
if result["status"] == "error":
result["message"] += "(1) If the task is to move a certain distance in a specific direction but the path is blocked by an obstacle, first move to a position where there is no obstacle in that direction, and then move the specified distance along that direction. (2) If the obstacles height is lower than the maximum altitude the drone can reach, the drone may ascend to an altitude higher than the obstacle and fly over it. If the obstacle's height is 0, then it indicates no drone can fly over it (In this case you need to detour). (3) You can use `get_obstacles` tool to see the obstacle, and pay attention to the obstacle\'s shape and size. (4) You may try `auto_navigate_to` tool to detour if it works. (5) Do not move so far since the perception range is 150m."
# check if the point is reached by any drone
scan_point_is_reached_by_drones(drone_id, x, y)
return json.dumps(result, indent=2)
except json.JSONDecodeError as e:
return f"Error parsing JSON input: {str(e)}. Expected format: {{\"drone_id\": \"drone-001\", \"x\": 100.0, \"y\": 50.0, \"z\": 20.0}}"
@@ -1023,7 +1096,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
result = auto_navigate_to_non_tool(input_json)
return result
def auto_navigate_to_non_tool(input_json: str, move_towards_flag: bool = False, move_towards_distance: float = 0.0, move_towards_direction: int = 0) -> str:
def auto_navigate_to_non_tool(input_json: str, move_towards_flag: bool = False, move_towards_distance: float = 0.0, move_towards_direction: float = 0.0) -> str:
global obstacles_detected
try:
# 1. 解析参数
@@ -1032,7 +1105,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
tx, ty, tz = params.get('x'), params.get('y'), params.get('z')
if not drone_id or tx is None or ty is None or tz is None:
return "Error: drone_id, x, y, z are required"
return json.dumps({"status": "error", "message": "drone_id, x, y, z are required"})
# 2. 获取状态
status = client.get_drone_status(drone_id)
@@ -1221,7 +1294,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
error_str += f" (Target/Obstacle status changed. Use `get_obstacles` or `get_targets` to get new information.) If fails too many times of automatic navigation, consider dynamically plan a way."
else:
error_str += f" Try not move so far."
return error_str
return json.dumps({"status": "error", "message": error_str})
final_msg = waypoint_move_result.get("message", "Success")
if len(waypoints) > 1 and move_towards_flag:
@@ -1234,7 +1307,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
return json.dumps({"status": "success", "path": waypoints, "message": final_msg})
except Exception as e:
return f"Error executing path finding: {str(e)}"
return json.dumps({"status": "error", "message": f"Error executing path finding: {str(e)}"})
@tool
def auto_explore(input_json: str) -> str:
@@ -1244,9 +1317,9 @@ def create_uav_tools(client: UAVAPIClient) -> list:
Input should be a JSON string with:
- drone_id: The ID of the drone (required)
- target_id: The ID of the target (required)
- coverage: Target coverage ratio (required)
- coverage: Target coverage ratio (required, e.g., 0.95 or 95)
Example: {{"drone_id": "drone-001", "x": 100.0, "y": 50.0, "z": 20.0}}
Example: {{"drone_id": "drone-001", "target_id": "target-001", "coverage": 0.95}}
"""
import json
import math
@@ -1265,6 +1338,9 @@ def create_uav_tools(client: UAVAPIClient) -> list:
t_pos = target['position']
t_z = t_pos['z']
task_radius = drone.get('task_radius', 10.0)
buffer = task_radius * 0.5
# --- 判定条件 1: Z 坐标相同 ---(无需判断)
# if not math.isclose(d_z, t_z, abs_tol=0.1):
# return False, drone, target # 返回数据以便复用
@@ -1290,18 +1366,25 @@ def create_uav_tools(client: UAVAPIClient) -> list:
if intersect:
inside = not inside
j = i
if not inside and buffer > 0:
for i in range(len(vertices)):
v1 = (vertices[i]['x'], vertices[i]['y'])
v2 = (vertices[(i + 1) % len(vertices)]['x'], vertices[(i + 1) % len(vertices)]['y'])
if GeometryUtils.point_to_segment_dist_sq((d_x, d_y), v1, v2) <= buffer**2:
inside = True
break
# 情况 B: 圆形区域
elif t_type in ['circle', 'waypoint', 'fixed']:
t_x, t_y = t_pos['x'], t_pos['y']
radius = target.get('radius', 0.0)
radius = target.get('radius', 0.0) + buffer
distance = math.sqrt((d_x - t_x)**2 + (d_y - t_y)**2)
inside = distance <= radius
return inside, drone, target
# ================= 2. 辅助函数:点是否在目标内 =================
def is_point_in_target(x, y, target_data):
def is_point_in_target(x, y, target_data, buffer=0.0):
t_type = target_data['type']
if t_type == 'polygon':
vertices = target_data.get('vertices', [])
@@ -1315,10 +1398,17 @@ def create_uav_tools(client: UAVAPIClient) -> list:
if intersect:
inside = not inside
j = i
return inside
if inside: return True
if buffer > 0:
for i in range(len(vertices)):
v1 = (vertices[i]['x'], vertices[i]['y'])
v2 = (vertices[(i + 1) % len(vertices)]['x'], vertices[(i + 1) % len(vertices)]['y'])
if GeometryUtils.point_to_segment_dist_sq((x, y), v1, v2) <= buffer**2:
return True
return False
else: # Circle based
t_x, t_y = target_data['position']['x'], target_data['position']['y']
radius = target_data.get('radius', 0.0)
radius = target_data.get('radius', 0.0) + buffer
return math.sqrt((x - t_x)**2 + (y - t_y)**2) <= radius
# ================= 3. 主逻辑 =================
@@ -1329,6 +1419,9 @@ def create_uav_tools(client: UAVAPIClient) -> list:
# 如果必须使用 x,y,z 寻找 target则需要额外的逻辑去匹配 target_id
target_id = data.get('target_id', 'unknown_target')
required_coverage = data.get('coverage', 0.95)
# 归一化覆盖率输入 (处理 95 为 0.95)
if required_coverage > 1:
required_coverage /= 100.0
except Exception as e:
return f"Error parsing input: {str(e)}"
@@ -1341,15 +1434,18 @@ def create_uav_tools(client: UAVAPIClient) -> list:
task_radius = drone_data.get('task_radius', 10.0)
current_z = drone_data['position']['z']
# 计算 Bounding Box (边界框)
# 增加覆盖缓冲区,确保边缘覆盖 (取半径的一半作为外扩)
coverage_buffer = task_radius * 0.5
# 计算 Bounding Box (边界框),并向外扩张缓冲区
if target_data['type'] == 'polygon':
vx = [v['x'] for v in target_data['vertices']]
vy = [v['y'] for v in target_data['vertices']]
min_x, max_x = min(vx), max(vx)
min_y, max_y = min(vy), max(vy)
min_x, max_x = min(vx) - coverage_buffer, max(vx) + coverage_buffer
min_y, max_y = min(vy) - coverage_buffer, max(vy) + coverage_buffer
else:
tx, ty = target_data['position']['x'], target_data['position']['y']
r = target_data['radius']
r = target_data['radius'] + coverage_buffer
min_x, max_x = tx - r, tx + r
min_y, max_y = ty - r, ty + r
@@ -1366,8 +1462,8 @@ def create_uav_tools(client: UAVAPIClient) -> list:
y_cursor = min_y
col_points = []
while y_cursor <= max_y:
# 只有当网格点在目标几何体内时,才加入路径
if is_point_in_target(x_cursor, y_cursor, target_data):
# 使用带缓冲区的判定,确保边缘点也被纳入扫描路径
if is_point_in_target(x_cursor, y_cursor, target_data, coverage_buffer):
col_points.append({'x': x_cursor, 'y': y_cursor})
y_cursor += step_size
@@ -1394,8 +1490,8 @@ def create_uav_tools(client: UAVAPIClient) -> list:
return "Error: Target area is too small or invalid geometry found."
# 3.4 执行探索
# 初始化覆盖率为0防止在没有执行任何移动时变量未定义
current_coverage = 0.0
# 根据已探索进度初始化覆盖率
current_coverage = tool_states.explored_count / total_points
first_warning = 0
for idx, wp in enumerate(final_path):
if idx < tool_states.explored_count:
@@ -1433,8 +1529,9 @@ def create_uav_tools(client: UAVAPIClient) -> list:
# 检查是否达标
if current_coverage >= required_coverage:
explored_count = tool_states.explored_count
tool_states.explored_count = 0
return f"Success: Target explored with coverage {current_coverage:.2%} (Visited {tool_states.explored_count}/{total_points} grid points)"
return f"Success: Target explored with coverage {current_coverage:.2%} (Visited {explored_count}/{total_points} grid points)"
if math.isclose(current_coverage, 0.0):
return f"Finished path. Final coverage: {current_coverage:.2%}. Please try call this tool again to continue exploring."
else:
@@ -1448,7 +1545,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
No input required.
"""
global targets_expolred, obstacles_detected
global targets_expolred, obstacles_detected, scan_points
import math
import json
@@ -1458,6 +1555,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
if not drones:
return "Error: No drones available for scanning."
# 获取无人机 ID
drone_ids = [d['id'] for d in drones]
active_drones = []
@@ -1473,26 +1571,62 @@ def create_uav_tools(client: UAVAPIClient) -> list:
# 初始感知:起飞后先看一眼,建立初步地图
env_perception()
# 2. 生成扫描网格 (Grid Generation)
# 区域 1024x1024。步长设为 250m平衡覆盖率与时间成本
scan_points = []
area_size = 1024.0
step_size = 250.0
# # 2. 生成扫描网格 (Grid Generation)
# # 区域 1024x1024。步长设为 250m平衡覆盖率与时间成本
# scan_points = []
# area_size = 1024.0
# step_size = 250.0
x = step_size / 2
while x < area_size:
y = step_size / 2
while y < area_size:
scan_points.append((x, y))
y += step_size
x += step_size
# x = step_size / 2
# while x < area_size:
# y = step_size / 2
# while y < area_size:
# scan_points.append((x, y))
# y += step_size
# x += step_size
# 2. 从scan_points中获得需要未探索的点
unvisited_points = [point for point in scan_points if not point.visited]
# 3. 任务分配 (基于区域:左上、左下、右上、右下)
cx, cy = 512, 384
quadrants_points = {"TL": [], "BL": [], "TR": [], "BR": []}
for p in unvisited_points:
if p.x < cx:
if p.y >= cy: quadrants_points["TL"].append(p)
else: quadrants_points["BL"].append(p)
else:
if p.y >= cy: quadrants_points["TR"].append(p)
else: quadrants_points["BR"].append(p)
# 3. 任务分配 (Round-Robin)
tasks = {did: [] for did in active_drones}
num_drones = len(active_drones)
for i, point in enumerate(scan_points):
assigned_drone = active_drones[i % num_drones]
tasks[assigned_drone].append(point)
drone_quadrants = {}
for did in active_drones:
d_status = client.get_drone_status(did)
pos = d_status['position']
q = "TL" if pos['x'] < cx and pos['y'] >= cy else \
"BL" if pos['x'] < cx and pos['y'] < cy else \
"TR" if pos['x'] >= cx and pos['y'] >= cy else "BR"
drone_quadrants.setdefault(q, []).append(did)
assigned_points = set()
for q, q_points in quadrants_points.items():
q_drones = drone_quadrants.get(q, [])
if q_drones:
for i, p in enumerate(q_points):
tasks[q_drones[i % len(q_drones)]].append(p)
assigned_points.add(p)
print("Drone Quadrants:", drone_quadrants)
print("Quadrants Points:", quadrants_points)
# 兜底:分配那些所在区域没有无人机的点
remaining_points = [p for p in unvisited_points if p not in assigned_points]
print("Remaining points:", remaining_points)
if remaining_points:
for i, p in enumerate(remaining_points):
tasks[active_drones[i % num_drones]].append(p)
scan_log = []
max_points = max([len(t) for t in tasks.values()]) if tasks else 0
@@ -1501,7 +1635,8 @@ def create_uav_tools(client: UAVAPIClient) -> list:
for i in range(max_points):
for did in active_drones:
if i < len(tasks[did]):
tx, ty = tasks[did][i]
point = tasks[did][i]
tx, ty = point.x, point.y
# 构造导航参数
nav_payload = json.dumps({
@@ -1520,6 +1655,9 @@ def create_uav_tools(client: UAVAPIClient) -> list:
nav_result = json.loads(nav_result_str)
status = nav_result.get("status", "error")
if status == "success":
# 标记该点已访问
scan_point_is_reached_by_drones(did, tx, ty)
# 2. 无论导航成功与否(可能半路停下),都进行感知
# 这对于"一边撞墙一边开图"的探索过程至关重要
@@ -1534,19 +1672,41 @@ def create_uav_tools(client: UAVAPIClient) -> list:
scan_log.append(f"Drone {did} nav error: {str(nav_err)}")
# 即使出错,也要尝试感知当前位置
env_perception()
unvisited_points = [point for point in scan_points if not point.visited]
# 5. 汇总结果
return json.dumps({
"status": "success",
"message": f"Global smart scan completed with {num_drones} drones.",
"total_targets_detected": len(targets_expolred),
"total_obstacles_detected": len(obstacles_detected),
"scan_details": scan_log
"scan_details": scan_log,
"scan_all_environment_completed": (len(unvisited_points) <= 0)
}, indent=2)
except Exception as e:
return f"Error during smart scanning: {str(e)}"
@tool
def get_scan_points() -> str:
"""Get the list of scan points categorized by visited status and current drone info(pos,status).
No input required."""
try:
visited_points = [p.to_dict() for p in scan_points if p.visited]
unvisited_points = [p.to_dict() for p in scan_points if not p.visited]
drones = client.list_drones()
drones_info = {d['id']: {'pos': d['position'], 'status': d['status']} for d in drones}
result = {
"visited_points": visited_points,
"unvisited_points": unvisited_points,
"drones_info": drones_info
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Error getting scan points: {str(e)}"
# Return all tools
return [
@@ -1559,7 +1719,7 @@ def create_uav_tools(client: UAVAPIClient) -> list:
auto_explore,
# get_targets,
get_obstacles,
get_nearby_entities,
# get_nearby_entities,
take_off,
land,
move_to,
@@ -1579,5 +1739,6 @@ def create_uav_tools(client: UAVAPIClient) -> list:
get_nearest_waypoint,
get_all_waypoints,
get_targets,
auto_scan_all_environment
auto_scan_all_environment,
get_scan_points
]