""" UAV API Client Wrapper for the UAV Control System API to simplify drone operations """ import requests from typing import Dict, List, Any, Tuple, Optional class UAVAPIClient: """Client for interacting with the UAV Control System API""" def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None): """ Initialize UAV API Client Args: base_url: Base URL of the UAV API server api_key: Optional API key for authentication (defaults to USER role if not provided) - None or empty: USER role (basic access) - Valid key: SYSTEM or ADMIN role (based on key) """ self.base_url = base_url.rstrip('/') self.api_key = api_key self.headers = {} if self.api_key: self.headers['X-API-Key'] = self.api_key def _request(self, method: str, endpoint: str, **kwargs) -> Any: """Make HTTP request to the API""" url = f"{self.base_url}{endpoint}" # Merge authentication headers with any provided headers headers = kwargs.pop('headers', {}) headers.update(self.headers) try: response = requests.request(method, url, headers=headers, **kwargs) response.raise_for_status() if response.status_code == 204: return None return response.json() except requests.exceptions.HTTPError as e: if e.response.status_code == 401: raise Exception(f"Authentication failed: Invalid API key") elif e.response.status_code == 403: error_detail = e.response.json().get('detail', 'Access denied') raise Exception(f"Permission denied: {error_detail}") raise Exception(f"API request failed: {e}") except requests.exceptions.RequestException as e: raise Exception(f"API request failed: {e}") # Drone Operations def list_drones(self) -> List[Dict[str, Any]]: """Get all drones in the current session""" return self._request('GET', '/drones') def get_all_waypoints(self) -> List[Dict[str, Any]]: """Get all waypoints in the current session""" return self._request('GET', '/targets/type/waypoint') def get_drone_status(self, drone_id: str) -> Dict[str, Any]: """Get detailed status of a specific drone""" return self._request('GET', f'/drones/{drone_id}') def take_off(self, drone_id: str, altitude: float = 10.0) -> Dict[str, Any]: """Command drone to take off to specified altitude""" return self._request('POST', f'/drones/{drone_id}/command/take_off',params={'altitude': altitude}) def land(self, drone_id: str) -> Dict[str, Any]: """Command drone to land at current position""" return self._request('POST', f'/drones/{drone_id}/command/land') def move_to(self, drone_id: str, x: float, y: float, z: float) -> Dict[str, Any]: """Move drone to specific coordinates""" return self._request('POST', f'/drones/{drone_id}/command/move_to', params={'x': x, 'y': y, 'z': z}) def optimal_way_to(self, drone_id: str, x: float, y: float, z: float, min_safe_height: float = 0.5) -> List[Dict[str, float]]: """ 计算到达目标点的最优路径(仅水平绕行)。 """ # 1. 目标高度检查 if z < min_safe_height: print(f"Error: Target altitude {z}m is too low.") return [] status = self.get_drone_status(drone_id) start_pos = status['position'] start_coords = (start_pos['x'], start_pos['y'], start_pos['z']) end_coords = (x, y, z) # 2. 起点高度检查 if start_coords[2] < min_safe_height: print(f"Warning: Drone is currently below safe height!") # 3. 执行递归搜索 path_points = self._find_path_recursive( start_coords, end_coords, avoidance_radius=2.0, # 初始绕行半径 depth=0, max_depth=4, # 最大递归深度 min_safe_height=min_safe_height ) if path_points is None: print(f"Error: Unable to find a collision-free path.") return [] # 4. 格式化输出 formatted_path = [{"x": p[0], "y": p[1], "z": p[2]} for p in path_points] for point in formatted_path: self.move_to(drone_id, **point) return formatted_path # --- 递归核心 --- def _find_path_recursive(self, start: Tuple[float, float, float], end: Tuple[float, float, float], avoidance_radius: float, depth: int, max_depth: int, min_safe_height: float) -> Optional[List[Tuple[float, float, float]]]: sx, sy, sz = start ex, ey, ez = end # 1. 检查直连是否有碰撞 collision = self.check_path_collision(sx, sy, sz, ex, ey, ez) if not collision: return [end] # 2. 达到最大深度则停止 if depth >= max_depth: return None # 3. 计算路径中点 mid_point = ((sx + ex) / 2, (sy + ey) / 2, (sz + ez) / 2) # 随着深度增加,减小绕行半径,进行更精细的搜索 current_radius = avoidance_radius / (1 + 0.5 * depth) # 4. 获取仅包含左右方向的候选点 candidates = self._get_horizontal_avoidance_points(start, end, mid_point, current_radius) # 5. 遍历候选点 for candidate in candidates: # 过滤掉非法高度的点 (虽然水平偏移理论上不改变高度,但以防万一) if candidate[2] < min_safe_height: continue # 递归处理:起点 -> 候选点 path_first = self._find_path_recursive(start, candidate, avoidance_radius, depth + 1, max_depth, min_safe_height) if path_first is not None: # 递归处理:候选点 -> 终点 path_second = self._find_path_recursive(candidate, end, avoidance_radius, depth + 1, max_depth, min_safe_height) if path_second is not None: # 路径拼接 return path_first + path_second # 所有左右尝试都失败 return None # --- 向量计算 (核心修改部分) --- def _get_horizontal_avoidance_points(self, start, end, mid, radius) -> List[Tuple[float, float, float]]: """ 生成候选点:强制仅在水平面上进行左右偏移。 """ # 1. 计算飞行方向向量 D = End - Start dx = end[0] - start[0] dy = end[1] - start[1] # dz 我们不关心,因为我们要在水平面找垂线 # 计算水平投影的长度 dist_horizontal = (dx*dx + dy*dy)**0.5 rx, ry, rz = 0.0, 0.0, 0.0 # 2. 计算右向量 (Right Vector) if dist_horizontal == 0: # 特殊情况:垂直升降 (Start和End的x,y相同) # 此时"左右"没有绝对定义,我们任意选取 X 轴方向作为偏移方向 rx, ry, rz = 1.0, 0.0, 0.0 else: # 标准情况:利用 2D 向量旋转 90 度原理 # 向量 (x, y) 顺时针旋转 90 度变为 (y, -x) # 归一化 rx = dy / dist_horizontal ry = -dx / dist_horizontal rz = 0.0 # 强制 Z 轴分量为 0,保证水平 mx, my, mz = mid # 3. 生成候选点:只生成 右(Right) 和 左(Left) # 注意:Right 是 (rx, ry),Left 是 (-rx, -ry) candidates = [] # 右侧点 c1 = (mx + rx * radius, my + ry * radius, mz) # Z高度保持中点高度不变 candidates.append(c1) # 左侧点 c2 = (mx - rx * radius, my - ry * radius, mz) candidates.append(c2) return candidates def move_along_path(self, drone_id: str, waypoints: List[Dict[str, float]]) -> Dict[str, Any]: """Move drone along a path of waypoints""" return self._request('POST', f'/drones/{drone_id}/command/move_along_path', json={'waypoints': waypoints}) def change_altitude(self, drone_id: str, altitude: float) -> Dict[str, Any]: """Change drone altitude while maintaining X/Y position""" return self._request('POST', f'/drones/{drone_id}/command/change_altitude', params={'altitude': altitude}) def hover(self, drone_id: str, duration: Optional[float] = None) -> Dict[str, Any]: """ Command drone to hover at current position. Args: drone_id: ID of the drone duration: Optional duration to hover in seconds """ params = {} if duration is not None: params['duration'] = duration return self._request('POST', f'/drones/{drone_id}/command/hover', params=params) def rotate(self, drone_id: str, heading: float) -> Dict[str, Any]: """Rotate drone to face specific direction (0-360 degrees)""" return self._request('POST', f'/drones/{drone_id}/command/rotate', params={'heading': heading}) def move_towards(self, drone_id: str, distance: float, heading: Optional[float] = None, dz: Optional[float] = None) -> Dict[str, Any]: """ Move drone a specific distance in a direction. Args: drone_id: ID of the drone distance: Distance to move in meters heading: Optional heading direction (0-360). If None, uses current heading. dz: Optional vertical component (altitude change) """ params = {'distance': distance} if heading is not None: params['heading'] = heading if dz is not None: params['dz'] = dz return self._request('POST', f'/drones/{drone_id}/command/move_towards', params=params) def return_home(self, drone_id: str) -> Dict[str, Any]: """Command drone to return to home position""" return self._request('POST', f'/drones/{drone_id}/command/return_home') def set_home(self, drone_id: str) -> Dict[str, Any]: """Set current position as home position""" return self._request('POST', f'/drones/{drone_id}/command/set_home') def calibrate(self, drone_id: str) -> Dict[str, Any]: """Calibrate drone sensors""" return self._request('POST', f'/drones/{drone_id}/command/calibrate') def charge(self, drone_id: str, charge_amount: float) -> Dict[str, Any]: """Charge drone battery (when landed)""" return self._request('POST', f'/drones/{drone_id}/command/charge', params={'charge_amount': charge_amount}) def take_photo(self, drone_id: str) -> Dict[str, Any]: """Take a photo with drone camera""" return self._request('POST', f'/drones/{drone_id}/command/take_photo') def send_message(self, drone_id: str, target_drone_id: str, message: str) -> Dict[str, Any]: """ Send a message to another drone. Args: drone_id: ID of the sender drone target_drone_id: ID of the recipient drone message: Content of the message """ return self._request('POST', f'/drones/{drone_id}/command/send_message', params={'target_drone_id': target_drone_id, 'message': message}) def broadcast(self, drone_id: str, message: str) -> Dict[str, Any]: """ Broadcast a message to all other drones. Args: drone_id: ID of the sender drone message: Content of the message """ return self._request('POST', f'/drones/{drone_id}/command/broadcast', params={'message': message}) # Session Operations def get_current_session(self) -> Dict[str, Any]: """Get information about current mission session""" return self._request('GET', '/sessions/current') def get_session_data(self, session_id: str = 'current') -> Dict[str, Any]: """Get all entities in a session (drones, targets, obstacles, environment)""" return self._request('GET', f'/sessions/{session_id}/data') def get_task_progress(self, session_id: str = 'current') -> Dict[str, Any]: """Get mission task completion progress""" return self._request('GET', f'/sessions/{session_id}/task-progress') # Environment Operations def get_weather(self) -> Dict[str, Any]: """Get current weather conditions""" return self._request('GET', '/environments/current') def get_targets(self) -> List[Dict[str, Any]]: """Get all targets in the session""" fixed = self._request('GET', '/targets/type/fixed') moving = self._request('GET', '/targets/type/moving') waypoint = self._request('GET', '/targets/type/waypoint') circle = self._request('GET', '/targets/type/circle') polygen = self._request('GET', '/targets/type/polygon') return fixed + moving + waypoint + circle + polygen def get_target_status(self, target_id: str) -> Dict[str, Any]: """Get information about a specific target""" return self._request('GET', f'/targets/{target_id}') def get_waypoints(self) -> List[Dict[str, Any]]: """Get all charging station waypoints""" return self._request('GET', '/targets/type/waypoint') def get_nearest_waypoint(self, x: str, y: str, z: str) -> Dict[str, Any]: """Get nearest charging station waypoint""" return self._request('GET', '/targets/waypoints/nearest', json={'x': x, 'y': y, 'z': z}) def get_obstacles(self) -> List[Dict[str, Any]]: """Get all obstacles in the session""" point = self._request('GET', '/obstacles/type/point') circle = self._request('GET', '/obstacles/type/circle') polygon = self._request('GET', '/obstacles/type/polygon') ellipse = self._request('GET', '/obstacles/type/ellipse') return point + circle + polygon + ellipse def get_nearby_entities(self, drone_id: str) -> Dict[str, Any]: """Get entities near a drone (within perceived radius)""" return self._request('GET', f'/drones/{drone_id}/nearby') # Safety Operations def check_point_collision(self, x: float, y: float, z: float, safety_margin: float = 0.0) -> Optional[Dict[str, Any]]: """Check if a point collides with any obstacle""" result = self._request('POST', '/obstacles/collision/check', json={ 'point': {'x': x, 'y': y, 'z': z}, 'safety_margin': safety_margin }) return result def check_path_collision(self, start_x: float, start_y: float, start_z: float, end_x: float, end_y: float, end_z: float, safety_margin: float = 1.0) -> Optional[Dict[str, Any]]: """Check if a path intersects any obstacle""" result = self._request('POST', '/obstacles/collision/path', json={ 'start': {'x': start_x, 'y': start_y, 'z': start_z}, 'end': {'x': end_x, 'y': end_y, 'z': end_z}, 'safety_margin': safety_margin }) return result