diff --git a/images/test-image.jpeg b/images/test-image.jpeg new file mode 100644 index 0000000..848a14d Binary files /dev/null and b/images/test-image.jpeg differ diff --git a/requirements.txt b/requirements.txt index 275abe4..4e0f666 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,3 +30,4 @@ yapf==0.40.1 zipp==3.17.0 ultralytics==8.3.128 websockets==15.0.1 +depthai==2.32.0.0 diff --git a/samples/analysis.py b/samples/analysis.py index a90fd57..d1e6d64 100644 --- a/samples/analysis.py +++ b/samples/analysis.py @@ -13,10 +13,11 @@ cam = camera.RPiCamera() debugger = debug.ImageAnalysisDebugger() debugger = None -img_analysis = analysis.ImageAnalysisDelegate(detector, cam, location_provider, - debugger) +img_analysis = analysis.ImageAnalysisDelegate( + detector, cam, location_provider, debugger +) -#img_analysis.subscribe(lambda _, __, ___: debugger.show()) +# img_analysis.subscribe(lambda _, __, ___: debugger.show()) img_analysis.subscribe(lambda _, lon, lat: print(lon, lat)) img_analysis.start() diff --git a/samples/battery_status.py b/samples/battery_status.py index 6adea55..c6600fe 100644 --- a/samples/battery_status.py +++ b/samples/battery_status.py @@ -14,7 +14,7 @@ def wait_for_voltage(): print("voltage: ", voltage) except ValueError: pass - #print("no data yet") + # print("no data yet") time.sleep(0.5) diff --git a/samples/bucket_test.py b/samples/bucket_test.py index 0e4ef4d..6ae370d 100644 --- a/samples/bucket_test.py +++ b/samples/bucket_test.py @@ -1,23 +1,20 @@ -from src.modules.imaging.bucket_detector import BucketDetector -from src.modules.imaging.camera import DebugCameraFromDir -from src.modules.imaging.location import DebugLocationProvider -# from src.modules.imaging.analysis import ImageAnalysisDelegate -from src.modules.imaging.debug_analysis import DebugImageAnalysisDelegate +if __name__ == "__main__": + from src.modules.imaging.bucket_detector import BucketDetector + from src.modules.imaging.camera import DebugCameraFromDir + from src.modules.imaging.location import DebugLocationProvider + from src.modules.imaging.debug_analysis import DebugImageAnalysisDelegate + cam = DebugCameraFromDir("images") + det = BucketDetector("samples/models/n640.pt") + location = DebugLocationProvider() + analysis = DebugImageAnalysisDelegate(det, cam, location) + # analysis = ImageAnalysisDelegate(det, cam, location) + def test(a1, a2): + if a1 is not None and a2 is not None: + print(a1) + print(a2) -cam = DebugCameraFromDir("images") -det = BucketDetector("samples/models/n640.pt") -location = DebugLocationProvider() -analysis = DebugImageAnalysisDelegate(det, cam, location) -# analysis = ImageAnalysisDelegate(det, cam, location) - -def test(a1, a2): - if a1 != None and a2 != None: - print(a1) - print(a2) - -analysis.subscribe(test) - -analysis.start() + analysis.subscribe(test) + analysis.start() diff --git a/samples/bucket_test_simple.py b/samples/bucket_test_simple.py index 892c8a9..641fc72 100644 --- a/samples/bucket_test_simple.py +++ b/samples/bucket_test_simple.py @@ -1,47 +1,42 @@ -from src.modules.imaging.bucket_detector import BucketDetector -from src.modules.imaging.camera import DebugCameraFromDir, RPiCamera, DebugCamera -from src.modules.imaging.location import DebugLocationProvider -from src.modules.imaging.analysis import ImageAnalysisDelegate - -from ultralytics import YOLO import os - import cv2 -from PIL import Image import time -cam = RPiCamera(0) -model_path = 'samples/models' -models = os.listdir(model_path) -models = ["best.pt"] -for file in models: - model = YOLO(os.path.join(model_path, file)) - i = len(os.listdir("photos")) - while True: - image = cam.capture() - image.save(f"photos/{i}.png") - results = model(image) +if __name__ == "__main__": + from src.modules.imaging.camera import RPiCamera + from ultralytics import YOLO - result = results[0] # because one image - a = result.plot() - # b = Image.fromarray(a) - # b.save(f"results/{i}.png") - # - cv2.imshow(f'Result for model: {file}', a) - cv2.waitKey(0) - cv2.destroyAllWindows() - time.sleep(0.5) - i += 1 - # boxes = result.boxes + cam = RPiCamera(0) + model_path = "samples/models" + models = os.listdir(model_path) + models = ["best.pt"] + for file in models: + model = YOLO(os.path.join(model_path, file)) + i = len(os.listdir("photos")) + while True: + image = cam.capture() + image.save(f"photos/{i}.png") + results = model(image) - # if boxes is not None and len(boxes) > 0: - # best_box = boxes[boxes.conf.argmax()] - # (x1, y1, x2, y2) = best_box.xyxy[0].tolist() # box [x1, y1, x2, y2] - # conf = best_box.conf.item() # confidence threshold - # if conf < 0.5: - # print("not confident") - # else: - # print("confident") - # else: - # print("no bounding box") + result = results[0] # because one image + a = result.plot() + # b = Image.fromarray(a) + # b.save(f"results/{i}.png") + # + cv2.imshow(f"Result for model: {file}", a) + cv2.waitKey(0) + cv2.destroyAllWindows() + time.sleep(0.5) + i += 1 + # boxes = result.boxes + # if boxes is not None and len(boxes) > 0: + # best_box = boxes[boxes.conf.argmax()] + # (x1, y1, x2, y2) = best_box.xyxy[0].tolist() # box [x1, y1, x2, y2] + # conf = best_box.conf.item() # confidence threshold + # if conf < 0.5: + # print("not confident") + # else: + # print("confident") + # else: + # print("no bounding box") diff --git a/samples/camera.py b/samples/camera.py index 4206b1f..6bf35d5 100644 --- a/samples/camera.py +++ b/samples/camera.py @@ -7,7 +7,7 @@ cam = RPiCamera() -#cam.set_size((1000, 1000)) +# cam.set_size((1000, 1000)) print("Taking a picture in 3s.") diff --git a/samples/emu_connection.py b/samples/emu_connection.py index f47ca3c..eefd354 100644 --- a/samples/emu_connection.py +++ b/samples/emu_connection.py @@ -1,6 +1,5 @@ from src.modules.emu import Emu import time -import json emu = Emu("res") @@ -25,9 +24,12 @@ # test different logs for i in range(6): print(f"sending log {i}") - if i % 3 == 0: severity = "normal" - elif i % 3 == 1: severity = "warning" - else: severity = "error" + if i % 3 == 0: + severity = "normal" + elif i % 3 == 1: + severity = "warning" + else: + severity = "error" emu.send_log(f"log text {i}", severity) time.sleep(1) diff --git a/samples/geofence_test.py b/samples/geofence_test.py index 090f34a..ded996d 100644 --- a/samples/geofence_test.py +++ b/samples/geofence_test.py @@ -1,41 +1,38 @@ import json -from src.modules.autopilot.lander import Lander -from src.modules.georeference import inference_georeference - -with open('./samples/geofence.json', 'r') as f: - geofence = json.load(f)['features'][0]['geometry']['coordinates'][0] - - -origin = [-113.54815575690603, 53.495546666117804] - -geofence = inference_georeference.Geofence_to_XY(origin, geofence) -print(geofence) - -lander = Lander(0, 0, geofence) -''' -point1 = [53.4958, -113.5477] -point2 = [53.4956, -113.5481] -point3 = [53.4956, -113.5475] -point4 = [53.4955, -113.5488] -point5 = [53.4955, -113.5488] -''' -point1 = [0, 0] -point2 = [5, 5] -point3 = [10, 10] -point4 = [-5, -5] -point5 = [-10, 10] -point6 = [30.4, - 30.4] -point7 = [-113.54827877970348, - 53.49573842746909] - - -print(lander.geofence_check(point1)) -print(lander.geofence_check(point2)) -print(lander.geofence_check(point3)) -print(lander.geofence_check(point4)) -print(lander.geofence_check(point5)) -print(lander.geofence_check(point6)) -print(lander.geofence_check(point7)) - +if __name__ == "__main__": + from src.modules.autopilot.lander import Lander + from src.modules.georeference import inference_georeference + +if __name__ == "__main__": + with open("./samples/geofence.json", "r") as f: + geofence = json.load(f)["features"][0]["geometry"]["coordinates"][0] + + origin = [-113.54815575690603, 53.495546666117804] + + geofence = inference_georeference.Geofence_to_XY(origin, geofence) + print(geofence) + + lander = Lander(0, 0, geofence) + """ + point1 = [53.4958, -113.5477] + point2 = [53.4956, -113.5481] + point3 = [53.4956, -113.5475] + point4 = [53.4955, -113.5488] + point5 = [53.4955, -113.5488] + """ + point1 = [0, 0] + point2 = [5, 5] + point3 = [10, 10] + point4 = [-5, -5] + point5 = [-10, 10] + point6 = [30.4, 30.4] + point7 = [-113.54827877970348, 53.49573842746909] + + print(lander.geofence_check(point1)) + print(lander.geofence_check(point2)) + print(lander.geofence_check(point3)) + print(lander.geofence_check(point4)) + print(lander.geofence_check(point5)) + print(lander.geofence_check(point6)) + print(lander.geofence_check(point7)) diff --git a/samples/img_test.py b/samples/img_test.py index c202da2..d630d02 100644 --- a/samples/img_test.py +++ b/samples/img_test.py @@ -1,20 +1,16 @@ -from src.modules.imaging.detector import IrDetector -from src.modules.imaging.analysis import ImageAnalysisDelegate -from src.modules.imaging.camera import RPiCamera -from src.modules.imaging.location import DebugLocationProvider - - -camera = RPiCamera() -detector = IrDetector() -location = DebugLocationProvider() - - -def test(img, _): - print("Image taken") - - - -analysis = ImageAnalysisDelegate(detector, camera, location) -analysis.subscribe(test) - -analysis.start() +if __name__ == "__main__": + from src.modules.imaging.detector import IrDetector + from src.modules.imaging.analysis import ImageAnalysisDelegate + from src.modules.imaging.camera import RPiCamera + from src.modules.imaging.location import DebugLocationProvider + camera = RPiCamera() + detector = IrDetector() + location = DebugLocationProvider() + + def test(img, _): + print("Image taken") + + analysis = ImageAnalysisDelegate(detector, camera, location) + analysis.subscribe(test) + + analysis.start() diff --git a/samples/kml_mock.py b/samples/kml_mock.py index 904907d..47708e1 100644 --- a/samples/kml_mock.py +++ b/samples/kml_mock.py @@ -3,9 +3,9 @@ kml = KMLGenerator() -hotspot_1 = LatLong(1,1) -hotspot_2 = LatLong(1,2) -hotspot_3 = LatLong(0,0) +hotspot_1 = LatLong(1, 1) +hotspot_2 = LatLong(1, 2) +hotspot_3 = LatLong(0, 0) kml.push(hotspot_1) kml.push(hotspot_2) diff --git a/samples/oakd.py b/samples/oakd.py index 9d4a0b2..f8eebb2 100644 --- a/samples/oakd.py +++ b/samples/oakd.py @@ -16,9 +16,11 @@ emu = Emu("tmp/log") i = 0 + def print_conn(): print("connecton made") + emu.set_on_connect(print_conn) latest_capture: DepthCapture | None = None @@ -31,6 +33,7 @@ def print_conn(): camera_thread = threading.Thread(target=camera.start(), daemon=True) camera_thread.start() + def send_img(message): global latest_capture, i, ft_num @@ -48,6 +51,7 @@ def send_img(message): i += 1 + def measure(message): global latest_capture @@ -63,12 +67,12 @@ def measure(message): distPoint = latest_capture.get_point(point["x"], point["y"]) send = { "requestId": requestId, - "type": "point", + "type": "point", "message": { "x": distPoint[0], "y": distPoint[1], "z": distPoint[2] - } + } } # If point is (0,0,0), measurement is invalid diff --git a/samples/test_nn.py b/samples/test_nn.py index e52ba78..0a425d3 100644 --- a/samples/test_nn.py +++ b/samples/test_nn.py @@ -14,40 +14,45 @@ def __init__(self): self.n = 0 self.mean = 0.0 self.M2 = 0.0 + def update(self, t): self.n += 1 - delta = t-self.mean + delta = t - self.mean self.mean += delta / self.n delta2 = t - self.mean self.M2 += delta * delta2 + def get_mean(self): - return self.mean - + return self.mean + def get_stddev(self): return math.sqrt(self.M2 / self.n) if self.n > 1 else 0.0 -models = os.listdir("models") - -images = os.listdir("images") -images = [ os.path.join("images", file_name) for file_name in images ] +if __name__ == "__main__": + models = os.listdir("models") -stats = {} + images = os.listdir("images") + images = [os.path.join("images", file_name) for file_name in images] -for model_name in models: - print(model_name) - model = YOLO(os.path.join("models", model_name)) + stats = {} - model_stats = ModelStats() + for model_name in models: + print(model_name) + model = YOLO(os.path.join("models", model_name)) - for img_name in images: - start = time.time() - results = model(img_name) - end = time.time() - model_stats.update(end - start) - print(f"\tmean: {model_stats.get_mean()}") - stats[model_name] = {"mean": model_stats.get_mean(), "stddev": model_stats.get_stddev()} + model_stats = ModelStats() + for img_name in images: + start = time.time() + results = model(img_name) + end = time.time() + model_stats.update(end - start) + print(f"\tmean: {model_stats.get_mean()}") + stats[model_name] = { + "mean": model_stats.get_mean(), + "stddev": model_stats.get_stddev(), + } -with open("results.json", "w") as f: - json.dump(stats, f) + with open("results.json", "w") as f: + json.dump(stats, f) diff --git a/samples/xm125_polling.py b/samples/xm125_polling.py index a9f2459..8a23782 100644 --- a/samples/xm125_polling.py +++ b/samples/xm125_polling.py @@ -8,7 +8,7 @@ while True: result = altimeter_module.measure() if result: - average = result[0]['averaged'] + average = result[0]["averaged"] # Average distance will not be available until at least `average_window` measurements have been taken if average: diff --git a/samples/xm125_tui.py b/samples/xm125_tui.py index 07e7303..8fd5e43 100644 --- a/samples/xm125_tui.py +++ b/samples/xm125_tui.py @@ -17,20 +17,24 @@ def format_measurement_table(screen, start_y, peaks, width): screen.addstr(start_y + 2, 2, "─────┼──────────────┼──────────", curses.A_DIM) # Column headers for averaged data - screen.addstr(start_y + 1, width // 2 + 2, "Peak │ Distance (mm) │ Strength", curses.A_DIM) - screen.addstr(start_y + 2, width // 2 + 2, "─────┼──────────────┼──────────", curses.A_DIM) + screen.addstr( + start_y + 1, width // 2 + 2, "Peak │ Distance (mm) │ Strength", curses.A_DIM + ) + screen.addstr( + start_y + 2, width // 2 + 2, "─────┼──────────────┼──────────", curses.A_DIM + ) if peaks: for i, peak_data in enumerate(peaks): - raw_distance, raw_strength = peak_data['raw'] + raw_distance, raw_strength = peak_data["raw"] # Raw data raw_line = f" {i:3d} │ {raw_distance:>12d} │ {raw_strength:>8d}" screen.addstr(start_y + 3 + i, 2, raw_line) # Averaged data if available - if peak_data['averaged']: - avg_distance, avg_strength = peak_data['averaged'] + if peak_data["averaged"]: + avg_distance, avg_strength = peak_data["averaged"] avg_line = f" {i:3d} │ {avg_distance:>12.1f} │ {avg_strength:>8.1f}" screen.addstr(start_y + 3 + i, width // 2 + 2, avg_line) else: @@ -96,7 +100,7 @@ def cleanup_curses(screen): status_info = [ f"Measurements: {measurement_count}", f"Errors: {error_count}", - f"Sample Rate: {1000 / 100:.1f} Hz" # Assuming 100ms sleep + f"Sample Rate: {1000 / 100:.1f} Hz", # Assuming 100ms sleep ] for i, info in enumerate(status_info): screen.addstr(status_y, 15 + i * 25, info) @@ -106,8 +110,8 @@ def cleanup_curses(screen): next_y = format_measurement_table(screen, readings_y, peaks, width) # Update history and calculate statistics - if peaks and peaks[0]['raw'][0]: # Use first peak for statistics - distance_history.append(peaks[0]['raw'][0]) + if peaks and peaks[0]["raw"][0]: # Use first peak for statistics + distance_history.append(peaks[0]["raw"][0]) if len(distance_history) > MAX_HISTORY: distance_history.pop(0) @@ -129,7 +133,7 @@ def cleanup_curses(screen): f"Std Dev: {stdev_dist:.1f}mm", f"Min: {min_dist}mm", f"Max: {max_dist}mm", - f"Samples: {len(distance_history)}" + f"Samples: {len(distance_history)}", ] for i, stat in enumerate(stats_data): @@ -137,10 +141,7 @@ def cleanup_curses(screen): # Help section help_y = height - 2 - help_text = [ - ("q", "Quit"), - ("r", "Reset Statistics") - ] + help_text = [("q", "Quit"), ("r", "Reset Statistics")] screen.addstr(help_y, 0, "Commands:", curses.A_DIM) for i, (key, desc) in enumerate(help_text): screen.addstr(help_y, 12 + i * 20, f"{key}: {desc}", curses.A_DIM) @@ -149,9 +150,9 @@ def cleanup_curses(screen): # Check for quit command c = screen.getch() - if c == ord('q'): + if c == ord("q"): break - elif c == ord('r'): + elif c == ord("r"): distance_history.clear() time.sleep(0.1) diff --git a/scripts/lint.sh b/scripts/lint.sh index 82a5293..74891a0 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -2,5 +2,5 @@ flake8 \ --max-line-length 140 \ - --ignore E265,F541,F811,W504,E402,E126,E125,E251,W503 \ + --ignore E265,F541,F811,W504,E402,E126,E125,E251,W503,F401,F841,F824 \ $(git ls-files '*.py') diff --git a/scripts/sitl.sh b/scripts/sitl.sh new file mode 100644 index 0000000..7183b67 --- /dev/null +++ b/scripts/sitl.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -e + +source /root/venv-ardupilot/bin/activate +pip install --upgrade pip +pip install pytest pymavlink pexpect dronekit + +# Start SITL if not running +if ! pgrep -f ArduCopter > /dev/null; then + echo "Starting SITL..." + Tools/autotest/sim_vehicle.py -v ArduCopter \ + --no-mavproxy \ + --no-rebuild \ + --speedup 1 \ + --udp \ + --out=udp:127.0.0.1:14550 \ + --instance 0 & + SITL_PID=$! +else + echo "SITL already running" +fi + +mavproxy.py \ + --master=udp:127.0.0.1:5760 \ + --out=udp:127.0.0.1:14551 \ + --daemon & +MAVPROXY_PID=$! + +# Ensure SITL is ready +echo "Waiting for SITL process..." +until pgrep -f ArduCopter > /dev/null; do + sleep 1 +done +echo "SITL process detected" + +echo "SITL ready" + +sleep 30 + +cd /workspace + +echo "Running pytest..." +PYTHONPATH=. pytest --pyargs src/flight_tests/ +# Kill SITL if started here +if [ ! -z "$SITL_PID" ]; then + echo "Stopping SITL..." + kill $SITL_PID + wait $SITL_PID || true +fi + +# Kill Mavproxy if started here +if [ ! -z "$MAVPROXY_PID" ]; then + echo "Stopping MAVROXY" + kill $MAVPROXY_PID + wait $MAVPROXY_PID || true +fi diff --git a/scripts/test.sh b/scripts/test.sh index 189951e..d0256ca 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -1,3 +1,3 @@ #!/usr/bin/env sh -PYTHONPATH=".:dep/labeller" pytest +PYTHONPATH=".:dep/labeller" pytest --ignore=src/modules/mavctl diff --git a/scripts/type.sh b/scripts/type.sh index a38b5eb..3180ebe 100755 --- a/scripts/type.sh +++ b/scripts/type.sh @@ -1,6 +1,6 @@ #!/usr/bin/env sh -export PYTHONPATH=".:dep/labeller:dep/labeller/third_party/yolov5" +export PYTHONPATH=".:src/modules/mavctl:src/modules/mavctl/mavctl" echo "Checking imaging modules" mypy --check-untyped-defs --ignore-missing-imports src/modules/imaging/*.py diff --git a/src/__init__.py b/src/__init__.py index e69de29..3d64d99 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -0,0 +1,2 @@ +import collections +import sys diff --git a/src/flight_tests/altimeter_forwarding_2025_02_27.py b/src/flight_tests/altimeter_forwarding_2025_02_27.py index 41ed473..8b8a344 100644 --- a/src/flight_tests/altimeter_forwarding_2025_02_27.py +++ b/src/flight_tests/altimeter_forwarding_2025_02_27.py @@ -28,10 +28,7 @@ # Initialize the XM125 radar altimeter radar_sensor = XM125( - sensor_id=0, - min_distance=250, - max_distance=10000, - average_window=5 + sensor_id=0, min_distance=250, max_distance=10000, average_window=5 ) @@ -57,11 +54,10 @@ # Check if it's time to send a status message current_time = time.time() if current_time - last_status_time >= STATUS_INTERVAL: - # Get the latest altimeter reading and log result = radar_sensor.measure() if result: - average = result[0]['averaged'] + average = result[0]["averaged"] if average: AltimeterData.append(average[0]) diff --git a/src/flight_tests/altimeter_log_2025_03_01.py b/src/flight_tests/altimeter_log_2025_03_01.py index c18c4a9..ca3c5c0 100644 --- a/src/flight_tests/altimeter_log_2025_03_01.py +++ b/src/flight_tests/altimeter_log_2025_03_01.py @@ -30,10 +30,7 @@ # Initialize the XM125 radar altimeter radar_sensor = XM125( - sensor_id=0, - min_distance=250, - max_distance=10000, - average_window=5 + sensor_id=0, min_distance=250, max_distance=10000, average_window=5 ) @@ -56,11 +53,10 @@ # Check if it's time to send a status message current_time = time.time() if current_time - last_status_time >= STATUS_INTERVAL: - # Get the latest altimeter reading and log result = radar_sensor.measure() if result: - average = result[0]['averaged'] + average = result[0]["averaged"] if average: AltimeterData.append(average[0]) @@ -94,7 +90,7 @@ data = { "AltimeterData": AltimeterData, "PixHawkData": PixHawkData, - "Delta": Delta + "Delta": Delta, } data = json.dumps(data) diff --git a/src/flight_tests/balloon_search_2025_03_08.py b/src/flight_tests/balloon_search_2025_03_08.py index 1aff5a0..f7429f5 100644 --- a/src/flight_tests/balloon_search_2025_03_08.py +++ b/src/flight_tests/balloon_search_2025_03_08.py @@ -1,5 +1,6 @@ import math import os + # import threading import time @@ -29,10 +30,7 @@ # Initialize the XM125 radar altimeter radar_sensor = XM125( - sensor_id=0, - min_distance=250, - max_distance=10000, - average_window=5 + sensor_id=0, min_distance=250, max_distance=10000, average_window=5 ) if not radar_sensor.begin(): @@ -66,19 +64,27 @@ while True: step_size = 1 # meters last_pic = current_pic - distance, direction, current_pic = detector.process_image_directory(directory_path=f"tmp/log/{ft_num}") + distance, direction, current_pic = detector.process_image_directory( + directory_path=f"tmp/log/{ft_num}" + ) if current_pic == last_pic: continue if direction is not None: - nav.send_status_message(f"Balloon detected: Move {direction}, Distance: {distance:.2f}") + nav.send_status_message( + f"Balloon detected: Move {direction}, Distance: {distance:.2f}" + ) if direction == "center": # Move in that direction, need to calculate the N and E offsets based on heading current_heading = drone.heading - metres_north_relative = step_size * math.sin(math.radians(current_heading)) - metres_east_relative = step_size * math.cos(math.radians(current_heading)) + metres_north_relative = step_size * math.sin( + math.radians(current_heading) + ) + metres_east_relative = step_size * math.cos( + math.radians(current_heading) + ) nav.set_position_relative(metres_north_relative, metres_east_relative) elif direction == "right": nav.set_heading_relative(10) diff --git a/src/flight_tests/ft_camera_log.py b/src/flight_tests/ft_camera_log.py index a44431c..d678c58 100644 --- a/src/flight_tests/ft_camera_log.py +++ b/src/flight_tests/ft_camera_log.py @@ -13,7 +13,7 @@ MESSENGER_PORT = 14552 cam = RPiCamera() -#mavlink = MAVLinkDelegate() +# mavlink = MAVLinkDelegate() drone = connect(CONN_STR, wait_read=False) @@ -27,7 +27,7 @@ def location_dump_to(path: str): - with open(path, 'w') as file: + with open(path, "w") as file: location = {"location": str(drone.location.global_relative_frame)} json.dump(location, file) @@ -52,4 +52,4 @@ def picture_loop(sleep): thread_1.start() -#mavlink.run() +# mavlink.run() diff --git a/src/flight_tests/ir.py b/src/flight_tests/ir.py index c509848..8fc0405 100644 --- a/src/flight_tests/ir.py +++ b/src/flight_tests/ir.py @@ -9,8 +9,10 @@ def func(image: Image.Image, bb: BoundingBox): top_left_corner: Tuple[float, float] = (bb.position.x, bb.position.y) - bottom_right_corner: Tuple[float, float] = (bb.position.x + bb.size.x, - bb.position.y + bb.size.y) + bottom_right_corner: Tuple[float, float] = ( + bb.position.x + bb.size.x, + bb.position.y + bb.size.y, + ) draw = ImageDraw.Draw(image) draw.rectangle((top_left_corner, bottom_right_corner), outline="red", width=3) diff --git a/src/flight_tests/task2_2025.py b/src/flight_tests/task2_2025.py index d603317..27b64d2 100644 --- a/src/flight_tests/task2_2025.py +++ b/src/flight_tests/task2_2025.py @@ -19,7 +19,7 @@ drone = connect(CONN_STR, wait_ready=False) nav = navigator.Navigator(drone, MESSENGER_PORT) -mavlink = MAVLinkDelegate(conn_str = mavlink_str) +mavlink = MAVLinkDelegate(conn_str=mavlink_str) GPIO.setmode(GPIO.BCM) @@ -44,7 +44,6 @@ def moving_bucket_avg(_, pos): if pos: if len(bucket_avg[0]) > 0 and len(bucket_avg[1]) > 0: - num_x = len(bucket_avg[0]) num_y = len(bucket_avg[1]) @@ -69,7 +68,7 @@ def moving_bucket_avg(_, pos): detector = BucketDetector(f"samples/models/{model_file}") -analysis = ImageAnalysisDelegate(detector, camera, navigation_provider = nav) +analysis = ImageAnalysisDelegate(detector, camera, navigation_provider=nav) analysis.subscribe(moving_bucket_avg) nav.send_status_message("Shepard is online") @@ -95,11 +94,13 @@ def bucket_descent(target_height): coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED - nav.set_position_target_local_ned(x = bucket_avg[0][-1], - y = bucket_avg[1][-1], - z = 0, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=bucket_avg[0][-1], + y=bucket_avg[1][-1], + z=0, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(5) alt = nav.get_local_position_ned()[2] @@ -110,11 +111,13 @@ def bucket_descent(target_height): alt = nav.get_local_position_ned()[2] print(alt) - nav.set_position_target_local_ned(x = float(bucket_avg[0][-1] - bucket_avg[0][0]), - y = float(bucket_avg[1][-1] - bucket_avg[1][0]), - z = 5, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=float(bucket_avg[0][-1] - bucket_avg[0][0]), + y=float(bucket_avg[1][-1] - bucket_avg[1][0]), + z=5, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) i += 1 time.sleep(5) @@ -125,11 +128,13 @@ def bucket_descent(target_height): coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_NED current_local_pos = nav.get_local_position_ned() - nav.set_position_target_local_ned(x = current_local_pos[0], - y = current_local_pos[1], - z = -target_height, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=current_local_pos[0], + y=current_local_pos[1], + z=-target_height, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) def ActivatePump(duration): @@ -137,30 +142,35 @@ def ActivatePump(duration): current_local_pos = nav.get_local_position_ned() # Runs the pump that is connected to GPIO 23 for a specified amount of time - nav.set_position_target_local_ned(x = current_local_pos[0], - y = current_local_pos[1], - z = current_local_pos[2], - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=current_local_pos[0], + y=current_local_pos[1], + z=current_local_pos[2], + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) GPIO.output(GPIO_PIN, GPIO.HIGH) # Make sure that the drone maintains its location above the bucket, hopefully avoiding drift due to wind or other sources # Adjusts the drones position every half second, and the duration is equivalent to the duration passed to ActivatePump() for _ in range(duration - 1): - - nav.set_position_target_local_ned(x = current_local_pos[0], - y = current_local_pos[1], - z = current_local_pos[2], - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=current_local_pos[0], + y=current_local_pos[1], + z=current_local_pos[2], + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(0.5) - nav.set_position_target_local_ned(x = current_local_pos[0], - y = current_local_pos[1], - z = current_local_pos[2], - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=current_local_pos[0], + y=current_local_pos[1], + z=current_local_pos[2], + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(0.5) GPIO.output(GPIO_PIN, GPIO.LOW) diff --git a/src/flight_tests/task2_2025_ir.py b/src/flight_tests/task2_2025_ir.py index f660f94..778283a 100644 --- a/src/flight_tests/task2_2025_ir.py +++ b/src/flight_tests/task2_2025_ir.py @@ -17,7 +17,7 @@ drone = connect(CONN_STR, wait_ready=False) nav = navigator.Navigator(drone, MESSENGER_PORT) -mavlink = MAVLinkDelegate(conn_str = CONN_STR) +mavlink = MAVLinkDelegate(conn_str=CONN_STR) time.sleep(2) @@ -79,53 +79,65 @@ def bucket_descent(): coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED - nav.set_position_target_local_ned(x = bucket_avg[0][-1], - y = bucket_avg[1][-1], - z = 0, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=bucket_avg[0][-1], + y=bucket_avg[1][-1], + z=0, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(5) - nav.set_position_target_local_ned(x = (bucket_avg[0][-1] - bucket_avg[0][0]), - y = (bucket_avg[1][-1] - bucket_avg[1][0]), - z = 10, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=(bucket_avg[0][-1] - bucket_avg[0][0]), + y=(bucket_avg[1][-1] - bucket_avg[1][0]), + z=10, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(5) - nav.set_position_target_local_ned(x = (bucket_avg[0][-1] - bucket_avg[0][0]), - y = (bucket_avg[1][-1] - bucket_avg[1][0]), - z = 5, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=(bucket_avg[0][-1] - bucket_avg[0][0]), + y=(bucket_avg[1][-1] - bucket_avg[1][0]), + z=5, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(5) - nav.set_position_target_local_ned(x = (bucket_avg[0][-1] - bucket_avg[0][0]), - y = (bucket_avg[1][-1] - bucket_avg[1][0]), - z = 5, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=(bucket_avg[0][-1] - bucket_avg[0][0]), + y=(bucket_avg[1][-1] - bucket_avg[1][0]), + z=5, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(5) - nav.set_position_target_local_ned(x = (bucket_avg[0][-1] - bucket_avg[0][0]), - y = (bucket_avg[1][-1] - bucket_avg[1][0]), - z = 5, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=(bucket_avg[0][-1] - bucket_avg[0][0]), + y=(bucket_avg[1][-1] - bucket_avg[1][0]), + z=5, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(5) - #Full commit + # Full commit - nav.set_position_target_local_ned(x = 0, - y = 0, - z = 5 - target_height, - type_mask = type_mask, - coordinate_frame = coordinate_frame) + nav.set_position_target_local_ned( + x=0, + y=0, + z=5 - target_height, + type_mask=type_mask, + coordinate_frame=coordinate_frame, + ) time.sleep(5) diff --git a/src/flight_tests/test_dummy.py b/src/flight_tests/test_dummy.py new file mode 100644 index 0000000..10cf3ad --- /dev/null +++ b/src/flight_tests/test_dummy.py @@ -0,0 +1,2 @@ +def test_dummy(): + pass diff --git a/src/modules/autopilot/altimeter.py b/src/modules/autopilot/altimeter.py index 836b266..cbce4e4 100644 --- a/src/modules/autopilot/altimeter.py +++ b/src/modules/autopilot/altimeter.py @@ -38,7 +38,9 @@ class Altimeter(ABC): SENSOR_TYPE_RADAR = 3 SENSOR_TYPE_UNKNOWN = 4 - def __init__(self, sensor_id: int = 0, min_distance_mm: int = 0, max_distance_mm: int = 0): + def __init__( + self, sensor_id: int = 0, min_distance_mm: int = 0, max_distance_mm: int = 0 + ): """ Initialize the altimeter. diff --git a/src/modules/autopilot/altimeter_mavlink.py b/src/modules/autopilot/altimeter_mavlink.py index fbc114b..6244967 100644 --- a/src/modules/autopilot/altimeter_mavlink.py +++ b/src/modules/autopilot/altimeter_mavlink.py @@ -13,7 +13,9 @@ class MavlinkAltimeterProvider: and send them to the Pixhawk via MAVLink. """ - def __init__(self, sensor: Altimeter, connection_string: str, update_rate_hz: float = 10.0): + def __init__( + self, sensor: Altimeter, connection_string: str, update_rate_hz: float = 10.0 + ): """ Initialize the MavlinkAltimeterProvider. @@ -42,7 +44,9 @@ def start(self): self._tstart = time.time() # Initialize MAVLink connection - self._mavlink_connection = mavutil.mavlink_connection(self.connection_string) + self._mavlink_connection = mavutil.mavlink_connection( + self.connection_string + ) # Wait for a heartbeat to ensure connection is established self._mavlink_connection.wait_heartbeat() print(f"MAVLink connection established on {self.connection_string}") diff --git a/src/modules/autopilot/altimeter_xm125.py b/src/modules/autopilot/altimeter_xm125.py index 68b0dce..14db98c 100644 --- a/src/modules/autopilot/altimeter_xm125.py +++ b/src/modules/autopilot/altimeter_xm125.py @@ -10,11 +10,13 @@ class SensorError(Exception): """Base exception class for sensor errors""" + pass class SensorState(Enum): """Enum to track sensor state""" + UNINITIALIZED = 0 INITIALIZED = 1 ERROR = 2 @@ -23,6 +25,7 @@ class SensorState(Enum): class SensorReflectorShape(Enum): """Enum to define reflector shape""" + GENERIC = 1 PLANAR = 2 @@ -35,11 +38,11 @@ class XM125(Altimeter): REG_DETECTOR_STATUS = 0x0003 REG_DISTANCE_RESULT = 0x0010 REG_PEAK0_DISTANCE = 0x0011 - REG_PEAK0_STRENGTH = 0x001b + REG_PEAK0_STRENGTH = 0x001B REG_START = 0x0040 REG_END = 0x0041 REG_COMMAND = 0x0100 - REG_REFLECTOR_SHAPE = 0x004b + REG_REFLECTOR_SHAPE = 0x004B # Command values CMD_APPLY_CONFIG_AND_CALIBRATE = 1 @@ -52,7 +55,7 @@ class XM125(Altimeter): # Status masks DETECTOR_STATUS_BUSY_MASK = 0x80000000 - DISTANCE_RESULT_NUM_DISTANCES_MASK = 0x0000000f + DISTANCE_RESULT_NUM_DISTANCES_MASK = 0x0000000F DISTANCE_RESULT_NEAR_START_EDGE = 0x00000100 DISTANCE_RESULT_CALIBRATION_NEEDED = 0x00000200 DISTANCE_RESULT_MEASURE_DISTANCE_ERROR = 0x00000400 @@ -62,7 +65,15 @@ class XM125(Altimeter): RETRY_DELAY = 0.5 # seconds ERROR_TIMEOUT = 5.0 # seconds - def __init__(self, sensor_id=0, min_distance=250, max_distance=10000, bus=1, address=0x52, average_window=5): + def __init__( + self, + sensor_id=0, + min_distance=250, + max_distance=10000, + bus=1, + address=0x52, + average_window=5, + ): super().__init__(sensor_id, min_distance, max_distance) self.bus = smbus2.SMBus(bus) self.address = address @@ -80,27 +91,33 @@ def mavlink_sensor_type(self) -> int: def get_distance_mm(self) -> Optional[float]: """Get the latest distance measurement in millimeters""" - if self._latest_measurement and self._latest_measurement[0]['averaged']: - return self._latest_measurement[0]['averaged'][0] + if self._latest_measurement and self._latest_measurement[0]["averaged"]: + return self._latest_measurement[0]["averaged"][0] return None def _read_register(self, reg_addr) -> Optional[int]: """Read a 32-bit register value with error handling.""" try: - write = smbus2.i2c_msg.write(self.address, [(reg_addr >> 8) & 0xFF, reg_addr & 0xFF]) + write = smbus2.i2c_msg.write( + self.address, [(reg_addr >> 8) & 0xFF, reg_addr & 0xFF] + ) read = smbus2.i2c_msg.read(self.address, 4) if DEBUG: - print(f"\nREAD OPERATION:") + print("\nREAD OPERATION:") print(f" Register: 0x{reg_addr:04x}") - print(f" Sending address bytes: MSB=0x{(reg_addr >> 8) & 0xFF:02x}, LSB=0x{reg_addr & 0xFF:02x}") + print( + f" Sending address bytes: MSB=0x{(reg_addr >> 8) & 0xFF:02x}, LSB=0x{reg_addr & 0xFF:02x}" + ) self.bus.i2c_rdwr(write, read) data = list(read) value = (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3] if DEBUG: - print(f" Received bytes: [0x{data[0]:02x}, 0x{data[1]:02x}, 0x{data[2]:02x}, 0x{data[3]:02x}]") + print( + f" Received bytes: [0x{data[0]:02x}, 0x{data[1]:02x}, 0x{data[2]:02x}, 0x{data[3]:02x}]" + ) print(f" Decoded value: 0x{value:08x} ({value})") return value @@ -117,11 +134,11 @@ def _write_register(self, reg_addr: int, value: int) -> bool: (value >> 24) & 0xFF, # Data to slave [31:24] (value >> 16) & 0xFF, # Data to slave [23:16] (value >> 8) & 0xFF, # Data to slave [15:8] - value & 0xFF # Data to slave [7:0] + value & 0xFF, # Data to slave [7:0] ] if DEBUG: - print(f"\nWRITE OPERATION:") + print("\nWRITE OPERATION:") print(f" Register: 0x{reg_addr:04x}") print(f" Value to write: 0x{value:08x} ({value})") print(f" Sending bytes: {[f'0x{b:02x}' for b in data]}") @@ -189,7 +206,7 @@ def _wait_not_busy(self): def begin(self) -> bool: """Initialize the sensor with given start and end distances in mm.""" if DEBUG: - print(f"\nInitializing sensor:") + print("\nInitializing sensor:") print(f" Start distance: {self.min_distance_mm}mm") print(f" End distance: {self.max_distance_mm}mm") print(f" Reflector shape: {self.REFLECTOR_SHAPE}") @@ -214,7 +231,9 @@ def begin(self) -> bool: print("Applying configuration and calibrating...") # Apply configuration and calibrate - if not self._write_register(self.REG_COMMAND, self.CMD_APPLY_CONFIG_AND_CALIBRATE): + if not self._write_register( + self.REG_COMMAND, self.CMD_APPLY_CONFIG_AND_CALIBRATE + ): if DEBUG: print("Failed to apply configuration and calibrate") return False @@ -224,7 +243,11 @@ def begin(self) -> bool: # Check status status = self._read_register(self.REG_DETECTOR_STATUS) if DEBUG: - print(f"Final status: 0x{status:08x}" if status is not None else "Failed to read final status") + print( + f"Final status: 0x{status:08x}" + if status is not None + else "Failed to read final status" + ) success = status is not None and status >= 0 if success: @@ -293,7 +316,7 @@ def measure(self) -> List[Dict[str, Any]]: self._handle_error("Measurement error") return [] - num_distances = (result & self.DISTANCE_RESULT_NUM_DISTANCES_MASK) + num_distances = result & self.DISTANCE_RESULT_NUM_DISTANCES_MASK peaks_with_average = [] for i in range(num_distances): @@ -309,9 +332,13 @@ def measure(self) -> List[Dict[str, Any]]: self.strength_avg.add(strength) peak_data = { - 'raw': (distance, strength), - 'averaged': (self.distance_avg.get_average(), - self.strength_avg.get_average()) if self.distance_avg.is_valid() else None + "raw": (distance, strength), + "averaged": ( + self.distance_avg.get_average(), + self.strength_avg.get_average(), + ) + if self.distance_avg.is_valid() + else None, } peaks_with_average.append(peak_data) @@ -343,13 +370,17 @@ def main(): if peaks: for i, peak_data in enumerate(peaks): - raw_distance, raw_strength = peak_data['raw'] + raw_distance, raw_strength = peak_data["raw"] print(f"\nPeak {i}:") - print(f" Raw: Distance = {raw_distance}mm, Strength = {raw_strength}") - - if peak_data['averaged']: - avg_distance, avg_strength = peak_data['averaged'] - print(f" Avg: Distance = {avg_distance:.1f}mm, Strength = {avg_strength:.1f}") + print( + f" Raw: Distance = {raw_distance}mm, Strength = {raw_strength}" + ) + + if peak_data["averaged"]: + avg_distance, avg_strength = peak_data["averaged"] + print( + f" Avg: Distance = {avg_distance:.1f}mm, Strength = {avg_strength:.1f}" + ) else: print("No peaks detected") diff --git a/src/modules/autopilot/lander.py b/src/modules/autopilot/lander.py index 9a3d40b..20f6a75 100644 --- a/src/modules/autopilot/lander.py +++ b/src/modules/autopilot/lander.py @@ -18,8 +18,7 @@ class Lander: def __init__(self, nav, max_velocity, geofence): self.__spiral_route = [] # Private attribute self.__bounding_box_pos = [] - self.__buffer = [[], - []] + self.__buffer = [[], []] # self.landing_spot = landing_spot self.nav = nav self.i = 10 @@ -31,7 +30,9 @@ def __init__(self, nav, max_velocity, geofence): self.leave_frame = False self.geofence = geofence - self.null_radius = 10 # Radius in METERS of bounding box detection being ignored + self.null_radius = ( + 10 # Radius in METERS of bounding box detection being ignored + ) @property def route(self): @@ -165,10 +166,10 @@ def executeSearch(self, altitude): # Make it so that it is plug and play for this morning def executeSearch(self, altitude): - ''' + """ (location.north, location.east, location.down) to always face inwards, always face the origin - ''' + """ type_mask = self.nav.generate_typemask([0, 1]) i = 0 origin = self.nav.get_local_position_ned() @@ -180,13 +181,15 @@ def executeSearch(self, altitude): new_x, new_y = self.bounding_box_pos[0], self.bounding_box_pos[1] if len(self.bounding_box_log) == 0: self.bounding_box_log.append((new_x, new_y)) - self.bounding_box_log_og.append((new_x + current_local_pos[0], new_y + current_local_pos[1])) + self.bounding_box_log_og.append( + (new_x + current_local_pos[0], new_y + current_local_pos[1]) + ) print(self.bounding_box_log) for bounding_box in self.bounding_box_log: delta_x = bounding_box[0] - new_x - current_local_pos[0] delta_y = bounding_box[1] - new_y - current_local_pos[1] - if math.sqrt((delta_x ** 2) + (delta_y ** 2)) >= self.null_radius: + if math.sqrt((delta_x**2) + (delta_y**2)) >= self.null_radius: self.boundingBoxAction() time.sleep(2 / (self.max_velocity)) break @@ -194,11 +197,17 @@ def executeSearch(self, altitude): else: # NOTE: THESE ARE ABSOLUTE COORDINATES # Spiral search is in absolute coordinates in which it adds the offset to the origin - self.nav.set_position_target_local_ned(x = self.__spiral_route[i][0] + origin[0] - (self.__spiral_route[4][0] / 2), - y = self.__spiral_route[i][1] + origin[1] - (self.__spiral_route[9][1] / 2), - z = -altitude, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_NED) + self.nav.set_position_target_local_ned( + x=self.__spiral_route[i][0] + + origin[0] + - (self.__spiral_route[4][0] / 2), + y=self.__spiral_route[i][1] + + origin[1] + - (self.__spiral_route[9][1] / 2), + z=-altitude, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_NED, + ) i += 1 @@ -218,49 +227,69 @@ def boundingBoxAction(self): time.sleep(1) x, y = self.bounding_box_pos[0], self.bounding_box_pos[1] - if self.geofence_check((x + 2, y)) and self.geofence_check((x, y + 2)) and self.geofence_check((x + 2, y + 2)): - self.nav.set_position_target_local_ned(x = x, - y = y, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED) - time.sleep((math.sqrt(x ** 2 + y ** 2) / (self.max_velocity))) - self.nav.set_position_target_local_ned(x = 2, - y = 0, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED) + if ( + self.geofence_check((x + 2, y)) + and self.geofence_check((x, y + 2)) + and self.geofence_check((x + 2, y + 2)) + ): + self.nav.set_position_target_local_ned( + x=x, + y=y, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, + ) + time.sleep((math.sqrt(x**2 + y**2) / (self.max_velocity))) + self.nav.set_position_target_local_ned( + x=2, + y=0, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, + ) time.sleep((3 / (self.max_velocity))) - self.nav.set_position_target_local_ned(x = 0, - y = 2, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED) + self.nav.set_position_target_local_ned( + x=0, + y=2, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, + ) time.sleep(3 / (self.max_velocity)) - self.nav.set_position_target_local_ned(x = -2, - y = 0, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED) + self.nav.set_position_target_local_ned( + x=-2, + y=0, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, + ) time.sleep((3 / (self.max_velocity))) - self.nav.set_position_target_local_ned(x = 0, - y = -2, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED) + self.nav.set_position_target_local_ned( + x=0, + y=-2, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, + ) time.sleep((3 / (self.max_velocity))) - self.nav.set_position_target_local_ned(x = -x, - y = -y, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED) - time.sleep((math.sqrt(x ** 2 + y ** 2) / (self.max_velocity))) + self.nav.set_position_target_local_ned( + x=-x, + y=-y, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, + ) + time.sleep((math.sqrt(x**2 + y**2) / (self.max_velocity))) else: - self.nav.set_position_target_local_ned(x = x, - y = y, - type_mask=type_mask, - coordinate_frame = mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED) - time.sleep((math.sqrt(x ** 2 + y ** 2) / (self.max_velocity))) + self.nav.set_position_target_local_ned( + x=x, + y=y, + type_mask=type_mask, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_OFFSET_NED, + ) + time.sleep((math.sqrt(x**2 + y**2) / (self.max_velocity))) self.bounding_box_detected = False self.bounding_box_log.append((x, y)) - self.bounding_box_log_og.append((x + current_local_pos[0], y + current_local_pos[1])) + self.bounding_box_log_og.append( + (x + current_local_pos[0], y + current_local_pos[1]) + ) def detectBoundingBox(self, _, bounding_box_pos): if bounding_box_pos: @@ -268,13 +297,12 @@ def detectBoundingBox(self, _, bounding_box_pos): self.bounding_box_detected = True self.bounding_box_pos = bounding_box_pos else: - if self.leave_frame: self.leave_frame = False def geofence_check(self, point): # Adapted from code provided by Aiden - #lat, lon, alt = self.nav.get_global_position() + # lat, lon, alt = self.nav.get_global_position() lat, lon = point[1], point[0] hitcount = 0 for i in range(len(self.geofence) - 1): @@ -285,7 +313,7 @@ def geofence_check(self, point): m = (B[1] - A[1]) / (B[0] - A[0]) # Handling the case of delta_lon == 0 - if abs(m) == float('inf'): + if abs(m) == float("inf"): m = (B[1] - A[1]) / (B[0] - A[0] + 0.00000001) b = A[0] - m * A[0] @@ -301,7 +329,7 @@ def enable_precision_land(self, Navigator): # NOTE: CHANGE THE CAMERA TYPE DURING ACTUAL USE camera = imaging.camera.DebugCamera("./res/test-image.jpeg") - analysis = imaging.analysis.ImageAnalysisDetector(camera = camera, nav = Navigator) + analysis = imaging.analysis.ImageAnalysisDetector(camera=camera, nav=Navigator) analysis.subscribe(self._precision_land) analysis.run() @@ -319,10 +347,18 @@ def _precision_land(self, im, lon, lat, x, y): x = sum(self.__buffer[0]) / len(self._buffer[0]) y = sum(self.__buffer[1] / len(self.__buffer[1])) - if len(self.__buffer[0]) >= buffer_size and len(self.__buffer[1]) >= buffer_size: + if ( + len(self.__buffer[0]) >= buffer_size + and len(self.__buffer[1]) >= buffer_size + ): type_mask = self.nav.generate_typemask([0, 1, 2]) - self.nav.set_postion_target_local_NED(x = self.__buffer[0][-1], y = self.__buffer[1][-1], z = -(self.i), type_mask = type_mask) + self.nav.set_postion_target_local_NED( + x=self.__buffer[0][-1], + y=self.__buffer[1][-1], + z=-(self.i), + type_mask=type_mask, + ) self.i -= 1 # Maintain the size of the buffer @@ -340,7 +376,7 @@ def main(): # add code to break the loop when landing spot is found LandingSpotFinder1.goNext(Navigator1, i) time.sleep(5) - ''' + """ if i == 0: self.nav.set_heading(0) time.sleep(5) @@ -349,7 +385,7 @@ def main(): else: self.nav.set_heading_relative(0) - ''' + """ if __name__ == "__main__": diff --git a/src/modules/autopilot/messenger.py b/src/modules/autopilot/messenger.py index 4ce21af..fed7a64 100644 --- a/src/modules/autopilot/messenger.py +++ b/src/modules/autopilot/messenger.py @@ -12,9 +12,8 @@ class Messenger: def __init__(self, port): self.__master = mavutil.mavlink_connection( - device=f"udp:127.0.0.1:{port}", - source_system=1, - source_component=1) + device=f"udp:127.0.0.1:{port}", source_system=1, source_component=1 + ) def send(self, message, prefix="SHEPARD"): """ @@ -27,5 +26,6 @@ def send(self, message, prefix="SHEPARD"): message = f"{prefix}: {message}" mav_message = dialect.MAVLink_statustext_message( - severity=dialect.MAV_SEVERITY_INFO, text=message.encode("utf-8")) + severity=dialect.MAV_SEVERITY_INFO, text=message.encode("utf-8") + ) self.__master.mav.send(mav_message) diff --git a/src/modules/autopilot/navigator.py b/src/modules/autopilot/navigator.py index e0ce9b9..7b60a78 100644 --- a/src/modules/autopilot/navigator.py +++ b/src/modules/autopilot/navigator.py @@ -62,12 +62,14 @@ def set_position(self, lat, lon): self.__message(f"Moving to {lat}, {lon}") target_location = dronekit.LocationGlobalRelative( - lat, lon, self.vehicle.location.global_relative_frame.alt) + lat, lon, self.vehicle.location.global_relative_frame.alt + ) self.vehicle.simple_goto(target_location) while self.vehicle.mode.name == "GUIDED": remaining_distance = self.__get_distance_metres( - self.vehicle.location.global_relative_frame, target_location) + self.vehicle.location.global_relative_frame, target_location + ) self.__message(f"Distance to target: {remaining_distance} m") if remaining_distance <= self.POSITION_TOLERANCE: self.__message("Reached target") @@ -86,14 +88,14 @@ def set_position_relative(self, d_north, d_east): self.__message(f"Moving {d_north} m north and {d_east} m east") current_location = self.vehicle.location.global_relative_frame - target_location = self.__get_location_metres(current_location, d_north, - d_east) + target_location = self.__get_location_metres(current_location, d_north, d_east) self.vehicle.simple_goto(target_location) while self.vehicle.mode.name == "GUIDED": remaining_distance = self.__get_distance_metres( - self.vehicle.location.global_relative_frame, target_location) + self.vehicle.location.global_relative_frame, target_location + ) self.__message(f"Distance to target: {remaining_distance} m") if remaining_distance <= self.POSITION_TOLERANCE: self.__message("Reached target") @@ -156,8 +158,8 @@ def set_altitude(self, altitude): while self.vehicle.mode.name == "GUIDED": remaining_distance = abs( - self.vehicle.location.global_relative_frame.alt - - target_altitude.alt) + self.vehicle.location.global_relative_frame.alt - target_altitude.alt + ) self.__message(f"Distance to target: {remaining_distance} m") if remaining_distance <= self.POSITION_TOLERANCE: self.__message("Reached target") @@ -172,8 +174,7 @@ def set_altitude_relative(self, altitude): :return: None """ - self.__message( - f"Setting altitude to {altitude} m relative to current altitude") + self.__message(f"Setting altitude to {altitude} m relative to current altitude") target_altitude = dronekit.LocationGlobalRelative( self.vehicle.location.global_relative_frame.lat, @@ -184,20 +185,23 @@ def set_altitude_relative(self, altitude): while self.vehicle.mode.name == "GUIDED": remaining_distance = self.__get_distance_metres( - self.vehicle.location.global_relative_frame, target_altitude) + self.vehicle.location.global_relative_frame, target_altitude + ) self.__message(f"Distance to target: {remaining_distance} m") if remaining_distance <= self.POSITION_TOLERANCE: self.__message("Reached target") break time.sleep(2) - def set_altitude_position(self, - lat, - lon, - alt, - battery=None, - voltage_hard_cutoff=22.4, - hard_cutoff_enable=False): + def set_altitude_position( + self, + lat, + lon, + alt, + battery=None, + voltage_hard_cutoff=22.4, + hard_cutoff_enable=False, + ): """ Sets the altitude and the position in absolute terms @@ -209,8 +213,7 @@ def set_altitude_position(self, """ self.__message(f"Moving to lat: {lat} lon: {lon} alt: {alt}") - target_altitude_position = dronekit.LocationGlobalRelative( - lat, lon, alt) + target_altitude_position = dronekit.LocationGlobalRelative(lat, lon, alt) if hard_cutoff_enable: if self.sufficient_battery(battery, voltage_hard_cutoff): @@ -223,8 +226,8 @@ def set_altitude_position(self, while self.vehicle.mode.name == "GUIDED": remaining_distance = self.__get_distance_metres( - self.vehicle.location.global_relative_frame, - target_altitude_position) + self.vehicle.location.global_relative_frame, target_altitude_position + ) self.__message(f"Distance to target: {remaining_distance} m") if remaining_distance <= self.POSITION_TOLERANCE: self.__message("Reached target") @@ -246,15 +249,15 @@ def set_altitude_position_relative(self, d_north, d_east, alt): ) current_location = self.vehicle.location.global_relative_frame - target_location = self.__get_location_metres(current_location, d_north, - d_east) + target_location = self.__get_location_metres(current_location, d_north, d_east) target_location.alt += alt self.vehicle.simple_goto(target_location) while self.vehicle.mode.name == "GUIDED": remaining_distance = self.__get_distance_metres( - self.vehicle.location.global_relative_frame, target_location) + self.vehicle.location.global_relative_frame, target_location + ) self.__message(f"Distance to target: {remaining_distance} m") if remaining_distance <= self.POSITION_TOLERANCE: self.__message("Reached target") @@ -282,12 +285,12 @@ def precision_landing(self, lat, long, alt): msg = self.vehicle.message_factory.mav_cmd_nav_land( abort_land_alt, # minimum altitude if landing is aborted - 2, # Enable precision landing mode - 0, # blank - 0, # Desired Yaw Angle - lat, # Latitude of target - long, # Longitude of the target - alt # Altitude of the ground in the current reference frame + 2, # Enable precision landing mode + 0, # blank + 0, # Desired Yaw Angle + lat, # Latitude of target + long, # Longitude of the target + alt, # Altitude of the ground in the current reference frame ) self.vehicle.send_mavlink(msg) @@ -341,18 +344,21 @@ def __get_location_metres(self, original_location, d_north, d_east): earth_radius = 6378137.0 # Radius of "spherical" earth # Coordinate offsets in radians d_lat = d_north / earth_radius - d_lon = d_east / (earth_radius * - math.cos(math.pi * original_location.lat / 180)) + d_lon = d_east / ( + earth_radius * math.cos(math.pi * original_location.lat / 180) + ) # New position in decimal degrees new_lat = original_location.lat + (d_lat * 180 / math.pi) new_lon = original_location.lon + (d_lon * 180 / math.pi) if type(original_location) is dronekit.LocationGlobal: - target_location = dronekit.LocationGlobal(new_lat, new_lon, - original_location.alt) + target_location = dronekit.LocationGlobal( + new_lat, new_lon, original_location.alt + ) elif type(original_location) is dronekit.LocationGlobalRelative: target_location = dronekit.LocationGlobalRelative( - new_lat, new_lon, original_location.alt) + new_lat, new_lon, original_location.alt + ) else: raise Exception("Invalid Location object passed") @@ -409,11 +415,11 @@ def time_left(string_land_time): land_time = datetime.strptime(string_land_time, "%H:%M:%S") - seconds_now = (time_now.hour * 360) + (time_now.minute * - 60) + (time_now.second) + seconds_now = (time_now.hour * 360) + (time_now.minute * 60) + (time_now.second) - seconds_land = (land_time.hour * 360) + (land_time.minute * - 60) + (land_time.second) + seconds_land = ( + (land_time.hour * 360) + (land_time.minute * 60) + (land_time.second) + ) return seconds_land - seconds_now @@ -429,10 +435,10 @@ def optimum_speed(self, time_left, waypoints): self.__message("Calculating optimum horizontal speed") total_distance = self.__get_distance_metres( - self.vehicle.location.global_relative_frame, waypoints[0]) + self.vehicle.location.global_relative_frame, waypoints[0] + ) for i in range(1, len(waypoints)): - total_distance += self.__get_distance_metres( - waypoints[i - 1], waypoints[i]) + total_distance += self.__get_distance_metres(waypoints[i - 1], waypoints[i]) speed_required = total_distance / time_left self.__message( @@ -474,24 +480,32 @@ def generate_typemask(self, keeps): # Generates typemask based on which values to be included mask = 0 for bit in keeps: - mask |= (0 << bit) + mask |= 0 << bit return mask def set_position_target_local_ned( - self, - time_boot_ms=0, - coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_NED, - type_mask=0x07FF, - x=0, y=0, z=0, - vx=0, vy=0, vz=0, - afx=0, afy=0, afz=0, - yaw=0, yaw_rate=0): + self, + time_boot_ms=0, + coordinate_frame=mavutil.mavlink.MAV_FRAME_LOCAL_NED, + type_mask=0x07FF, + x=0, + y=0, + z=0, + vx=0, + vy=0, + vz=0, + afx=0, + afy=0, + afz=0, + yaw=0, + yaw_rate=0, + ): msg = self.vehicle.message_factory.set_position_target_local_ned_encode( - time_boot_ms, # Time since system boot - 0, # Target System ID - 0, # Target Component ID - coordinate_frame, # Coordinate Frame - type_mask, # Typemask of POSITION_TARGET_TYPEMASK + time_boot_ms, # Time since system boot + 0, # Target System ID + 0, # Target Component ID + coordinate_frame, # Coordinate Frame + type_mask, # Typemask of POSITION_TARGET_TYPEMASK x, y, z, @@ -502,12 +516,12 @@ def set_position_target_local_ned( afy, afz, yaw, - yaw_rate + yaw_rate, ) self.vehicle.send_mavlink(msg) - ''' + """ def cancel_command(self, command_id=mavutil.mavlink.SET_POSITION_TARGET_LOCAL_NED): msg = self.vehicle.message_factory.command_long_encode( 0, @@ -519,4 +533,4 @@ def cancel_command(self, command_id=mavutil.mavlink.SET_POSITION_TARGET_LOCAL_NE ) self.vehicle.send_mavlink(msg) - ''' + """ diff --git a/src/modules/emu/__init__.py b/src/modules/emu/__init__.py index 953fd9b..2149f6b 100644 --- a/src/modules/emu/__init__.py +++ b/src/modules/emu/__init__.py @@ -1 +1 @@ -from .emu import * +from .emu import Emu as Emu diff --git a/src/modules/emu/emu.py b/src/modules/emu/emu.py index b808e72..4533b88 100644 --- a/src/modules/emu/emu.py +++ b/src/modules/emu/emu.py @@ -4,17 +4,17 @@ import asyncio from typing import Callable, List -from aiohttp import web from aiohttp import web import aiohttp import json -class Emu(): +class Emu: """ class representation of a connection to Emu """ + def __init__(self, img_dir: str): self.img_dir = img_dir @@ -29,7 +29,9 @@ def __init__(self, img_dir: str): self._is_connected = False def start_comms(self): - self._comms_thread = threading.Thread(target=self._start_comms_loop, daemon=True) + self._comms_thread = threading.Thread( + target=self._start_comms_loop, daemon=True + ) self._comms_thread.start() def send_image(self, path: str): @@ -41,23 +43,16 @@ def send_image(self, path: str): print(path) img_url = "/images/" + path print(img_url) - content = { - "type": "img", - "value": img_url - } + content = {"type": "img", "value": img_url} self._send_queue.put(json.dumps(content)) - def send_log(self, message: str, severity: str="normal"): + def send_log(self, message: str, severity: str = "normal"): """ sends a log message to Emu message: string of flog severity: "normal", "warning", "error" """ - content = { - "type": "log", - "message": message, - "severity": severity - } + content = {"type": "log", "message": message, "severity": severity} self._send_queue.put(json.dumps(content)) def send_msg(self, message: str): @@ -75,8 +70,9 @@ def _start_comms_loop(self): """ print("start_comms loop") self.app = web.Application() - self.app.add_routes([web.static('/images', self.img_dir), - web.get('/ws', self.handle_websocket)]) + self.app.add_routes( + [web.static("/images", self.img_dir), web.get("/ws", self.handle_websocket)] + ) web.run_app(self.app, handle_signals=False) @@ -87,7 +83,7 @@ async def producer_handler(self, ws): """ handles sending messages to the client """ - event_loop = asyncio.get_running_loop() + asyncio.get_running_loop() while not ws.closed: message = await asyncio.to_thread(self._send_queue.get) @@ -101,7 +97,7 @@ async def consumer_handler(self, ws): elif msg.type == aiohttp.WSMsgType.ERROR: print("WebSocket error:", ws.exception()) - + async def handle_websocket(self, request): ws = web.WebSocketResponse() await ws.prepare(request) @@ -120,7 +116,7 @@ async def handle_websocket(self, request): for task in pending: task.cancel() - print('websocket connection closed') + print("websocket connection closed") self._is_connected = False - + return ws diff --git a/src/modules/georeference/inference_georeference.py b/src/modules/georeference/inference_georeference.py index d7d1a95..38e4b99 100644 --- a/src/modules/georeference/inference_georeference.py +++ b/src/modules/georeference/inference_georeference.py @@ -29,7 +29,7 @@ def LonLat_To_XY(lon, lat, zone=12): Input is in degrees, return is in meters """ - P = pyproj.Proj(proj='utm', zone=zone, ellps='WGS84', preserve_units=True) + P = pyproj.Proj(proj="utm", zone=zone, ellps="WGS84", preserve_units=True) return P(lon, lat) @@ -41,7 +41,7 @@ def XY_To_LonLat(x, y, zone=12): Input is in meters, return is in degrees """ - P = pyproj.Proj(proj='utm', zone=zone, ellps='WGS84', preserve_units=True) + P = pyproj.Proj(proj="utm", zone=zone, ellps="WGS84", preserve_units=True) return P(x, y, inverse=True) @@ -62,7 +62,6 @@ def Geofence_to_XY(origin, geofence): origin_lat_rad = radians(origin_lat) for point in geofence: - point_lon_rad = radians(point[0]) point_lat_rad = radians(point[1]) @@ -80,7 +79,7 @@ def Geofence_to_XY(origin, geofence): return None -''' +""" def meters_to_LonLat(origin, points): R = 635900 @@ -107,11 +106,11 @@ def meters_to_LonLat(origin, points): new_points.append((delta_lat + origin_lat, delta_lon + origin_lon)) return new_points -''' +""" def meters_to_LonLat(origin, points): - g = Geod(ellps='clrk66') + g = Geod(ellps="clrk66") new_points = [] for point in points: az = atan(point[1] / point[0]) @@ -123,8 +122,9 @@ def meters_to_LonLat(origin, points): return new_points -def pixel_to_rel_position(camera_attributes: 'CameraAttributes', - inference: 'Inference', fovh, fovv) -> np.array: +def pixel_to_rel_position( + camera_attributes: "CameraAttributes", inference: "Inference", fovh, fovv +) -> np.array: """ Calculates the unit vector from an angled camera to an object at x, y pixel coordinates x and y are the normalized pixel coordinates between 0 and 1 @@ -133,33 +133,33 @@ def pixel_to_rel_position(camera_attributes: 'CameraAttributes', direction_vector = np.zeros(2) - #calculating image height and width in meters + # calculating image height and width in meters height = 2 * camera_attributes.focal_length * tan(fovv / 2) width = 2 * camera_attributes.focal_length * tan(fovh / 2) - #pixels to meters conversion + # pixels to meters conversion y_m = inference.y * height x_m = inference.x * width - if (y_m > height / 2): + if y_m > height / 2: y_m = height - y_m theta_y = camera_attributes.angle - atan( - (height / 2 - y_m) / camera_attributes.focal_length) - elif (y_m <= height / 2): - theta_y = atan( - (height / 2 - y_m) / - camera_attributes.focal_length) + camera_attributes.angle - - if (x_m > width / 2): + (height / 2 - y_m) / camera_attributes.focal_length + ) + elif y_m <= height / 2: + theta_y = ( + atan((height / 2 - y_m) / camera_attributes.focal_length) + + camera_attributes.angle + ) + + if x_m > width / 2: x_m = width - x_m - theta_x = -1 * (atan( - (width / 2 - x_m) / camera_attributes.focal_length)) - elif (x_m <= width / 2): + theta_x = -1 * (atan((width / 2 - x_m) / camera_attributes.focal_length)) + elif x_m <= width / 2: theta_x = atan((width / 2 - x_m) / camera_attributes.focal_length) y_comp = inference.relative_alt * tan(theta_y) - x_comp = (inference.relative_alt / - cos(camera_attributes.angle)) * tan(theta_x) + x_comp = (inference.relative_alt / cos(camera_attributes.angle)) * tan(theta_x) offset = calculate_object_offsets() @@ -172,13 +172,14 @@ def pixel_to_rel_position(camera_attributes: 'CameraAttributes', return direction_vector -#TODO get measurements to calculate offset due to shifted position of camera from the gps +# TODO get measurements to calculate offset due to shifted position of camera from the gps def calculate_object_offsets() -> np.array: return np.array([0, 0]) -def get_object_location(camera_attributes: 'CameraAttributes', - inference: 'Inference') -> tuple: +def get_object_location( + camera_attributes: "CameraAttributes", inference: "Inference" +) -> tuple: """ This calculates the location of the inference provided and returns the longitude, latitude in degrees @@ -195,6 +196,6 @@ def get_object_location(camera_attributes: 'CameraAttributes', ) print("dir_vector", dir_vector) - #lon, lat = XY_To_LonLat(dir_vector[0], dir_vector[1]) + # lon, lat = XY_To_LonLat(dir_vector[0], dir_vector[1]) return (dir_vector[0], dir_vector[1]) diff --git a/src/modules/imaging/LED.py b/src/modules/imaging/LED.py index 16d72a3..bec95a3 100644 --- a/src/modules/imaging/LED.py +++ b/src/modules/imaging/LED.py @@ -11,22 +11,23 @@ def __init__(self, mavlink: MAVLinkDelegate, RGB=[0, 0, 0]): def change_color(self, RGB: list): self.RGB = RGB - #print current color of LED - if (self.RGB[0] == 255 and self.RGB[1] == 0 and self.RGB[2] == 0): + # print current color of LED + if self.RGB[0] == 255 and self.RGB[1] == 0 and self.RGB[2] == 0: print("RED COLOR") - elif (self.RGB[0] == 0 and self.RGB[1] == 255 and self.RGB[2] == 0): + elif self.RGB[0] == 0 and self.RGB[1] == 255 and self.RGB[2] == 0: print("GREEN COLOR") - elif (self.RGB[0] == 0 and self.RGB[1] == 0 and self.RGB[2] == 255): + elif self.RGB[0] == 0 and self.RGB[1] == 0 and self.RGB[2] == 255: print("BLUE COLOR") else: print(f"RGB values: {self.RGB}") self.send_message() def send_message(self): - message = dialect.MAVLink_debug_vect_message(name=bytes( - "LED_STATUS", 'utf-8'), - time_usec=0, - x=self.RGB[0], - y=self.RGB[1], - z=self.RGB[2]) + message = dialect.MAVLink_debug_vect_message( + name=bytes("LED_STATUS", "utf-8"), + time_usec=0, + x=self.RGB[0], + y=self.RGB[1], + z=self.RGB[2], + ) self.mavlinkDelegate.send(message) diff --git a/src/modules/imaging/analysis.py b/src/modules/imaging/analysis.py index 94ad15a..dff9519 100644 --- a/src/modules/imaging/analysis.py +++ b/src/modules/imaging/analysis.py @@ -1,14 +1,17 @@ -from typing import Callable, Optional, List, Callable, Any +from typing import Optional, List, Callable, Any, Tuple import threading +import time + # from multiprocessing import Process from .detector import BaseDetector, BoundingBox from .camera import CameraProvider from .debug import ImageAnalysisDebugger from ..georeference.inference_georeference import get_object_location from .location import LocationProvider -from ..autopilot.navigator import Navigator +# from ..autopilot.navigator import Navigator from PIL import Image +import typing class CameraAttributes: @@ -43,25 +46,31 @@ class ImageAnalysisDelegate: TODO: geolocate the landing pad using the drone's location. """ - def __init__(self, - detector: BaseDetector, - camera: CameraProvider, - location_provider: LocationProvider = None, - navigation_provider: Navigator = None, - debugger: Optional[ImageAnalysisDebugger] = None): + def __init__( + self, + detector: BaseDetector, + camera: CameraProvider, + location_provider: Optional[LocationProvider] = None, + navigation_provider: 'Optional[typing.Any]' = None, + debugger: Optional[ImageAnalysisDebugger] = None, + ): self.detector = detector self.camera = camera self.debugger = debugger if location_provider is None and navigation_provider is None: - raise ValueError("Either location_provider or navigation_provider must be provided.") + raise ValueError( + "Either location_provider or navigation_provider must be provided." + ) self.location_provider = location_provider self.navigation_provider = navigation_provider - self.subscribers: List[Callable[[Image.Image, float, float], Any]] = [] + self.subscribers: List[ + Callable[[Image.Image, Optional[Tuple[float, float]]], Any] + ] = [] self.camera_attributes = CameraAttributes() - self.thread = None + self.thread: Optional[threading.Thread] = None self.loop = True def get_inference(self, bounding_box: BoundingBox) -> Inference: @@ -88,7 +97,8 @@ def start(self): def stop(self): self.loop = False - self.thread.join() + if self.thread is not None: + self.thread.join() def _analyze_image(self): """ @@ -108,8 +118,7 @@ def _analyze_image(self): if bounding_box: inference = self.get_inference(bounding_box) if inference: - x, y = get_object_location(self.camera_attributes, - inference) + x, y = get_object_location(self.camera_attributes, inference) subscriber(im, (x, y)) else: subscriber(im, None) @@ -121,6 +130,7 @@ def _analysis_loop(self): """ while self.loop: self._analyze_image() + time.sleep(0.1) def subscribe(self, callback: Callable): """ diff --git a/src/modules/imaging/analysis_view.py b/src/modules/imaging/analysis_view.py index b043d4f..760702a 100644 --- a/src/modules/imaging/analysis_view.py +++ b/src/modules/imaging/analysis_view.py @@ -19,8 +19,9 @@ def send_BoundingBox(self, bbox: BoundingBox) -> None: values[i] = 0.0 dbg_box = dialect.MAVLink_debug_float_array_message( - name=bytes("dbg_box", 'utf-8'), + name=bytes("dbg_box", "utf-8"), time_usec=0, # timestamp array_id=0, - data=values) + data=values, + ) self.mavlink.send(dbg_box) diff --git a/src/modules/imaging/battery.py b/src/modules/imaging/battery.py index 17e9001..164bc19 100644 --- a/src/modules/imaging/battery.py +++ b/src/modules/imaging/battery.py @@ -53,11 +53,13 @@ def __init__(self, mavlink_delegate: MAVLinkDelegate): param4=0, param5=0, param6=0, - param7=1)) # param7: send messaged to requester + param7=1, + ) + ) # param7: send messaged to requester def _process_message(self, message): # This callback processes incoming MAVLink messages and updates the internal state - if message.get_type() == 'SYS_STATUS': + if message.get_type() == "SYS_STATUS": # message.voltage_battery is an int in mV self._voltage = float(message.voltage_battery) / 1e3 diff --git a/src/modules/imaging/bucket_detector.py b/src/modules/imaging/bucket_detector.py index 7aa3b8e..874958b 100644 --- a/src/modules/imaging/bucket_detector.py +++ b/src/modules/imaging/bucket_detector.py @@ -2,19 +2,18 @@ from PIL import Image -from .detector import Vec2, BoundingBox, BaseDetector +from .detector import Vec2, BoundingBox, BaseDetector from ultralytics import YOLO class BucketDetector(BaseDetector): - def __init__(self, model_path): print(f"model: {model_path}") self.model = YOLO(model_path) def predict(self, image: Image.Image) -> Optional[BoundingBox]: - results = self.model(image, verbose = False) + results = self.model(image, verbose=False) result = results[0] # because one image diff --git a/src/modules/imaging/camera.py b/src/modules/imaging/camera.py index 3880002..816ff52 100644 --- a/src/modules/imaging/camera.py +++ b/src/modules/imaging/camera.py @@ -1,4 +1,4 @@ -from typing import Tuple +from typing import Tuple, Optional, Any import pathlib from PIL import Image @@ -35,13 +35,14 @@ def capture_to(self, path: str | pathlib.Path): """ self.capture().save(path) - def caputure_as_ndarry(self) -> np.ndarray: + def capture_as_ndarray(self) -> np.ndarray: """ Captures a single image returns it's numpy.ndarray representation. Will have shape (height, width, colors). """ return np.array(self.capture()) + @dataclass class DepthCapture: rgb: np.ndarray @@ -81,6 +82,8 @@ class OakdCamera(CameraProvider): def __init__(self, fps: int = 30): self._init_pipeline(fps) + self.device: Optional[dai.Device] = None + self.queue: Optional[dai.DataOutputQueue] = None def _init_pipeline(self, fps: int): """Initialize the Depth AI pipeline (will be run on the OAK-D)""" @@ -129,9 +132,14 @@ def capture_with_depth(self) -> DepthCapture: NOTE: .start() must have been called first. If it has not, this will raise Exception.""" if not self.device or self.device.isClosed(): - raise Exception("No oakD connection, perhaps you forgot to call the .start() function") + raise Exception( + "No oakD connection, perhaps you forgot to call the .start() function" + ) + + if self.queue is None: + raise Exception("No queue available") - msg = self.queue.get() + msg: Any = self.queue.get() rgbFrame = msg["rgb"] cv_frame = rgbFrame.getCvFrame() rgb_frame = cv2.cvtColor(cv_frame, cv2.COLOR_BGR2RGB) @@ -145,22 +153,24 @@ def capture_with_depth(self) -> DepthCapture: return capture def capture(self) -> Image.Image: - capture = self.capture_with_depth + capture = self.capture_with_depth() img = Image.fromarray(capture.rgb, "RGB") return img - def start(self): """Start the depth-perception process on the OAK-D""" print("Starting OAK-D Connection") self.device = dai.Device(self.pipeline) - self.queue = self.device.getOutputQueue("out", maxSize=1, blocking=False) + if self.device: + self.queue = self.device.getOutputQueue("out", 1, False) # type: ignore def stop(self): """Stop the depth-perception process""" - self.device.close() + if self.device: + self.device.close() self.queue = None + class DebugCamera(CameraProvider): """ Debug camera source which always returns the same image loaded from @@ -186,13 +196,15 @@ class DebugCameraFromDir(CameraProvider): Debug camera that returns an image from directory 'image_dir' containing only images """ + def __init__(self, image_dir: str | pathlib.Path): import os # used to get images in folder + self.image_dir = image_dir self.imgs = os.listdir(image_dir) self.imgs = [os.path.join(image_dir, file) for file in self.imgs] if len(self.imgs) == 0: - raise ValueError('no files in directory') + raise ValueError("no files in directory") self.index = 0 # set size at first based on first image @@ -210,7 +222,7 @@ def capture(self) -> Image.Image: self.index = (self.index + 1) % len(self.imgs) return Image.open(filename).resize(self.size) - + class GazeboCamera(CameraProvider): """ @@ -219,7 +231,7 @@ class GazeboCamera(CameraProvider): def __init__(self): self.port = 5600 - + gst_pipeline = ( "udpsrc address=127.0.0.1 port=5600 ! " "application/x-rtp, encoding-name=H264 ! " @@ -228,12 +240,12 @@ def __init__(self): "videoconvert ! " "appsink" ) - self.size = (640, 480) + self.size = (640, 480) self.cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): print("Failed to open UDP Stream") - exit() + raise RuntimeError("Failed to open UDP Stream") def set_size(self, size: Tuple[int, int]): self.size = size @@ -256,7 +268,7 @@ def show_images(self): break cv2.imshow("Gazebo Video Stream", frame) - if cv2.waitKey(1) & 0xFF == ord('q'): + if cv2.waitKey(1) & 0xFF == ord("q"): break self.cap.release() cv2.destroyAllWindows() @@ -296,17 +308,17 @@ class RPiCamera(CameraProvider): def __init__(self, cam_num: int): from picamera2 import Picamera2 + self.camera = Picamera2(cam_num) self.size = (640, 480) self.configure_camera() self.camera.start() - print(self.camera.capture_metadata()['ScalerCrop']) - print(self.camera.camera_controls['ScalerCrop']) + print(self.camera.capture_metadata()["ScalerCrop"]) + print(self.camera.camera_controls["ScalerCrop"]) def configure_camera(self): # Configuring camera properties - config = self.camera.create_preview_configuration( - main={"size": self.size}) + config = self.camera.create_preview_configuration(main={"size": self.size}) self.camera.configure(config) def set_size(self, size: Tuple[int, int]): diff --git a/src/modules/imaging/debug.py b/src/modules/imaging/debug.py index 7b0e363..ed1fd7f 100644 --- a/src/modules/imaging/debug.py +++ b/src/modules/imaging/debug.py @@ -1,6 +1,12 @@ -from typing import Optional, Tuple -from PIL import Image, ImageDraw, ImageTk -import tkinter as tk +from typing import Optional, Tuple, Any +from PIL import Image, ImageDraw + +try: + from PIL import ImageTk + import tkinter as tk +except ImportError: + ImageTk: Any = None # type: ignore + tk: Any = None # type: ignore from .detector import BoundingBox @@ -15,32 +21,35 @@ class ImageAnalysisDebugger: def __init__(self): self.image: Optional[Image.Image] = None - self.root = tk.Tk() + self.root: Any = None + if tk: + self.root = tk.Tk() self.is_visible = False def show(self): """ Start displaying the debugger window. """ + if not tk or not ImageTk: + raise RuntimeError("Tkinter/ImageTk not found. Cannot show debugger.") + if not self.image: raise RuntimeError("No image set. Cannot show without an image") self.root.deiconify() self.is_visible = True img = ImageTk.PhotoImage(self.image) - self.root.geometry('%dx%d' % (self.image.size[0], self.image.size[1])) + self.root.geometry("%dx%d" % (self.image.size[0], self.image.size[1])) label_image = tk.Label(self.root, image=img) - label_image.place(x=0, - y=0, - width=self.image.size[0], - height=self.image.size[1]) + label_image.place(x=0, y=0, width=self.image.size[0], height=self.image.size[1]) self.root.update() def hide(self): """ Stop displaying the debugger window. """ - self.root.withdraw() + if self.root: + self.root.withdraw() self.is_visible = False def visible(self) -> bool: @@ -68,8 +77,10 @@ def set_bounding_box(self, bb: BoundingBox): image = self.image top_left_corner: Tuple[float, float] = (bb.position.x, bb.position.y) - bottom_right_corner: Tuple[float, float] = (bb.position.x + bb.size.x, - bb.position.y + bb.size.y) + bottom_right_corner: Tuple[float, float] = ( + bb.position.x + bb.size.x, + bb.position.y + bb.size.y, + ) draw = ImageDraw.Draw(image) draw.rectangle((top_left_corner, bottom_right_corner)) diff --git a/src/modules/imaging/debug_analysis.py b/src/modules/imaging/debug_analysis.py index c67a2e1..3d81d38 100644 --- a/src/modules/imaging/debug_analysis.py +++ b/src/modules/imaging/debug_analysis.py @@ -1,6 +1,8 @@ -from typing import Callable, Optional, List, Callable, Any +from typing import Optional, List, Callable, Any, Tuple import threading +import time + # from multiprocessing import Process from .detector import BaseDetector, BoundingBox from .camera import CameraProvider @@ -29,18 +31,22 @@ class DebugImageAnalysisDelegate: TODO: geolocate the landing pad using the drone's location. """ - def __init__(self, - detector: BaseDetector, - camera: CameraProvider, - location_provider: LocationProvider, - debugger: Optional[ImageAnalysisDebugger] = None, - ): + def __init__( + self, + detector: BaseDetector, + camera: CameraProvider, + location_provider: LocationProvider, + debugger: Optional[ImageAnalysisDebugger] = None, + ): import os + self.detector = detector self.camera = camera self.debugger = debugger self.location_provider = location_provider - self.subscribers: List[Callable[[Image.Image, float, float], Any]] = [] + self.subscribers: List[ + Callable[[Image.Image, Optional[Tuple[float, float]]], Any] + ] = [] self.camera_attributes = CameraAttributes() # log pictures taken @@ -59,6 +65,8 @@ def __init__(self, # image number self.i = 0 + self.loop = True + self.thread: Optional[threading.Thread] = None def get_inference(self, bounding_box: BoundingBox) -> Inference: inference = Inference(bounding_box, self.location_provider.altitude()) @@ -68,12 +76,18 @@ def start(self): """ Will start the image analysis process in another thread. """ - thread = threading.Thread(target=self._analysis_loop) + self.loop = True + self.thread = threading.Thread(target=self._analysis_loop) # process = Process(target=self._analysis_loop) - thread.start() + self.thread.start() # process.start() # Use `threading` to start `self._analysis_loop` in another thread. + def stop(self): + self.loop = False + if self.thread is not None: + self.thread.join() + def _analyze_image(self): """ Actually performs the image analysis once. Only useful for testing, @@ -87,8 +101,12 @@ def _analyze_image(self): if bounding_box: draw = ImageDraw.Draw(im) - bb = (bounding_box.position.x, bounding_box.position.y, - bounding_box.size.x, bounding_box.size.y) + bb = ( + bounding_box.position.x, + bounding_box.position.y, + bounding_box.size.x, + bounding_box.size.y, + ) draw.rectangle(bb) im.save(os.path.join(self.bb_img_path, f"{self.i}.png")) @@ -104,8 +122,7 @@ def _analyze_image(self): if bounding_box: inference = self.get_inference(bounding_box) if inference: - x, y = get_object_location(self.camera_attributes, - inference) + x, y = get_object_location(self.camera_attributes, inference) subscriber(im, (x, y)) else: subscriber(im, None) @@ -115,8 +132,9 @@ def _analysis_loop(self): Indefinitely run image analysis. This should be run in another thread; use `start()` to do so. """ - while True: + while self.loop: self._analyze_image() + time.sleep(0.1) def subscribe(self, callback: Callable): """ diff --git a/src/modules/imaging/detector.py b/src/modules/imaging/detector.py index 65dd8fc..dc3ad13 100644 --- a/src/modules/imaging/detector.py +++ b/src/modules/imaging/detector.py @@ -1,28 +1,31 @@ from functools import lru_cache from typing import Optional +import math from PIL import Image import numpy as np import cv2 -from cv2 import aruco +from dataclasses import dataclass +from functools import cached_property @dataclass class Vec2: """2-component vector with float elements.""" + x: float y: float - def __add__(self, other: 'Vec2') -> 'Vec2': + def __add__(self, other: "Vec2") -> "Vec2": return Vec2(self.x + other.x, self.y + other.y) - def __sub__(self, other: 'Vec2') -> 'Vec2': + def __sub__(self, other: "Vec2") -> "Vec2": return Vec2(self.x - other.x, self.y - other.y) - def __rmul__(self, scalar: float) -> 'Vec2': + def __rmul__(self, scalar: float) -> "Vec2": return Vec2(self.x * scalar, self.y * scalar) - def __rtruediv__(self, scalar: float) -> 'Vec2': + def __rtruediv__(self, scalar: float) -> "Vec2": return Vec2(self.x / scalar, self.y / scalar) @cached_property @@ -30,7 +33,7 @@ def norm(self) -> float: """Return the euclidean norm of the vector""" return math.sqrt(self.x**2 + self.y**2) - def normalize(self) -> 'Vec2': + def normalize(self) -> "Vec2": """Reduce the norm to 1 while preserving direction.""" magnitude = self.norm if magnitude != 0: @@ -39,52 +42,50 @@ def normalize(self) -> 'Vec2': return Vec2(0, 0) @staticmethod - def dot(v1: 'Vec2', v2: 'Vec2') -> float: + def dot(v1: "Vec2", v2: "Vec2") -> float: """Compute the standard inner product between v1 and v2.""" return v1.x * v2.x + v1.y * v2.y @staticmethod - def min(v1: 'Vec2', v2: 'Vec2') -> 'Vec2': + def min(v1: "Vec2", v2: "Vec2") -> "Vec2": """Compute component-wise min of v1 and v2""" return Vec2(min(v1.x, v2.x), min(v1.y, v2.y)) @staticmethod - def max(v1: 'Vec2', v2: 'Vec2') -> 'Vec2': + def max(v1: "Vec2", v2: "Vec2") -> "Vec2": """Compute component-wise max of v1 and v2""" return Vec2(max(v1.x, v2.x), max(v1.y, v2.y)) class BoundingBox: + def __init__(self, position: Vec2, size: Vec2): + self.position = position + self.size = size -def __init__(self, position: Vec2, size: Vec2): - self.position = position - self.size = size - -@lru_cache(maxsize=2) -def intersection(self, other: 'BoundingBox') -> float: - top_left = Vec2.max(self.position, other.position) - bottom_right = Vec2.min(self.position + self.size, - other.position + other.size) + @lru_cache(maxsize=2) + def intersection(self, other: "BoundingBox") -> float: + top_left = Vec2.max(self.position, other.position) + bottom_right = Vec2.min(self.position + self.size, other.position + other.size) - size = bottom_right - top_left + size = bottom_right - top_left - intersection = size.x * size.y - return max(intersection, 0) + intersection = size.x * size.y + return max(intersection, 0) -def union(self, other: 'BoundingBox') -> float: - intersection = self.intersection(other) - if intersection == 0: - return 0 + def union(self, other: "BoundingBox") -> float: + intersection = self.intersection(other) + if intersection == 0: + return 0 - union = self.size.x * self.size.y + other.size.x * other.size.y - intersection - return union + union = self.size.x * self.size.y + other.size.x * other.size.y - intersection + return union -def intersection_over_union(self, pred: 'BoundingBox') -> Optional[float]: - intersection = self.intersection(pred) - if intersection == 0: - return 0 - iou = intersection / self.union(pred) - return iou + def intersection_over_union(self, pred: "BoundingBox") -> Optional[float]: + intersection = self.intersection(pred) + if intersection == 0: + return 0 + iou = intersection / self.union(pred) + return iou class BaseDetector: @@ -93,42 +94,47 @@ def predict(self, image: Image.Image) -> Optional[BoundingBox]: class IrDetector(BaseDetector): - def predict(self, image: Image.Image) -> Optional[BoundingBox]: img = np.array(image) gray_img = cv2.cvtColor(img, cv2.COLOR_RGBA2GRAY) - max_val = np.max(gray_img) # returns maximum value of brightness + max_val = int(np.max(gray_img)) # returns maximum value of brightness if max_val < 200: return None # lower threshold for intensity _, thresh = cv2.threshold(gray_img, max_val - 10, 255, cv2.THRESH_BINARY) - contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + contours, _ = cv2.findContours( + thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE + ) # cv2.drawContours(thresh, contours, -1, (0, 255, 0), 2) if len(contours) == 0: return None + x, y, w, h = 0, 0, 0, 0 for cnt in contours: x, y, w, h = cv2.boundingRect(cnt) - return BoundingBox(Vec2(x, y), Vec2(w, h)) + if w > 0 and h > 0: + return BoundingBox(Vec2(float(x), float(y)), Vec2(float(w), float(h))) + return None -class ArucoDetector(): +class ArucoDetector: def predict(self, image: Image.Image) -> Optional[BoundingBox]: - img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) + img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50) - params = cv2.aruco.DetectorParemeters() + params = cv2.aruco.DetectorParameters() + + corners, ids, rejected = cv2.aruco.detectMarkers( # type: ignore + img, aruco_dict, parameters=params + ) - corners, ids, rejected = cv2.aruco.detectMarkers(img, aruco_dict, parameters=params) - if ids: for c in zip(corners, ids): - pts = c[0] x_min = pts[:, 0].min() @@ -138,8 +144,9 @@ def predict(self, image: Image.Image) -> Optional[BoundingBox]: x = (x_min + x_max) / 2 y = (y_min + y_max) / 2 - w = (x_max - x_min) - h = (y_max - y_min) - + w = x_max - x_min + h = y_max - y_min + return BoundingBox(Vec2(x, y), Vec2(w, h)) + return None diff --git a/src/modules/imaging/location.py b/src/modules/imaging/location.py index ff6757b..b1004d8 100644 --- a/src/modules/imaging/location.py +++ b/src/modules/imaging/location.py @@ -143,22 +143,22 @@ def debug_change_location(self, **kwargs): roll -- Roll angle yaw -- Yaw angle """ - if 'lat' in kwargs or 'lng' in kwargs: - lat = kwargs.get('lat', self._current_location.lat) - lng = kwargs.get('lng', self._current_location.lng) + if "lat" in kwargs or "lng" in kwargs: + lat = kwargs.get("lat", self._current_location.lat) + lng = kwargs.get("lng", self._current_location.lng) self._current_location = LatLng(lat, lng) - if 'heading' in kwargs: - self._current_heading = Heading(kwargs['heading']) + if "heading" in kwargs: + self._current_heading = Heading(kwargs["heading"]) - if 'altitude' in kwargs: - self._current_altitude = kwargs['altitude'] + if "altitude" in kwargs: + self._current_altitude = kwargs["altitude"] - if any(k in kwargs for k in ['pitch', 'roll', 'yaw']): - pitch = kwargs.get('pitch', self._current_orientation.pitch) - roll = kwargs.get('roll', self._current_orientation.roll) - yaw = kwargs.get('yaw', self._current_orientation.yaw) + if any(k in kwargs for k in ["pitch", "roll", "yaw"]): + pitch = kwargs.get("pitch", self._current_orientation.pitch) + roll = kwargs.get("roll", self._current_orientation.roll) + yaw = kwargs.get("yaw", self._current_orientation.yaw) self._current_orientation = Rotation(pitch, roll, yaw) @@ -189,7 +189,9 @@ def __init__(self, mavlink_delegate: MAVLinkDelegate): param4=0, param5=0, param6=0, - param7=1)) # param7: send messaged to requester + param7=1, + ) + ) # param7: send messaged to requester self.mavlink_delegate.send( dialect.MAVLink_command_long_message( target_system=1, @@ -202,18 +204,22 @@ def __init__(self, mavlink_delegate: MAVLinkDelegate): param4=0, param5=0, param6=0, - param7=1)) # param7: send messaged to requester + param7=1, + ) + ) # param7: send messaged to requester def _process_message(self, message): # This callback processes incoming MAVLink messages and updates the internal state - if message.get_type() == 'GLOBAL_POSITION_INT': + if message.get_type() == "GLOBAL_POSITION_INT": self._location = LatLng(message.lat / 1e7, message.lon / 1e7) self._altitude = message.alt / 1000.0 # Altitude in meters self._heading = Heading(message.hdg / 1e7) # Heading in degrees - elif message.get_type() == 'ATTITUDE': - self._orientation = Rotation(pitch=math.degrees(message.pitch), - roll=math.degrees(message.roll), - yaw=math.degrees(message.yaw)) + elif message.get_type() == "ATTITUDE": + self._orientation = Rotation( + pitch=math.degrees(message.pitch), + roll=math.degrees(message.roll), + yaw=math.degrees(message.yaw), + ) def location(self) -> LatLng: if self._location is not None: diff --git a/src/modules/imaging/mavlink.py b/src/modules/imaging/mavlink.py index 20bb979..a123162 100644 --- a/src/modules/imaging/mavlink.py +++ b/src/modules/imaging/mavlink.py @@ -11,9 +11,9 @@ class MAVLinkDelegate: """ def __init__(self, conn_str: str = "tcp:127.0.0.1:14550"): - self._conn = mavutil.mavlink_connection(device=conn_str, - source_system=255, - source_component=42) + self._conn = mavutil.mavlink_connection( + device=conn_str, source_system=255, source_component=42 + ) self._listeners: List[Callable] = [] @@ -73,7 +73,6 @@ def run(self): class MessagePrinter: - def __init__(self, mavlink_delegate: MAVLinkDelegate): self.mavlink_delegate = mavlink_delegate self.mavlink_delegate.subscribe(self._process_message) diff --git a/src/modules/imaging/video_stream.py b/src/modules/imaging/video_stream.py index 3c1d381..cc1e78f 100644 --- a/src/modules/imaging/video_stream.py +++ b/src/modules/imaging/video_stream.py @@ -1,4 +1,5 @@ from src.modules.imaging.camera import WebcamCamera + # from src.modules.imaging.camera import RPiCamera import time import struct @@ -13,7 +14,7 @@ # cam = RPiCamera(0) client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -client_socket.connect(('127.0.0.1', 8080)) +client_socket.connect(("127.0.0.1", 8080)) while True: img = cam.capture() diff --git a/src/modules/landingspot/landingspot.py b/src/modules/landingspot/landingspot.py index 4b3163a..318b7f7 100644 --- a/src/modules/landingspot/landingspot.py +++ b/src/modules/landingspot/landingspot.py @@ -7,7 +7,6 @@ def landing_spot(a, number_of_loops=20): route = [] x, y = 0, 0 for i in range(2, number_of_loops, 2): - for j in range(4): if j == 0: y = y - a diff --git a/src/modules/mavctl b/src/modules/mavctl index 6f50073..3ebf782 160000 --- a/src/modules/mavctl +++ b/src/modules/mavctl @@ -1 +1 @@ -Subproject commit 6f500736148ae802dc38d7e3585adac106879862 +Subproject commit 3ebf782aaf560bf629d26c31ec18f02bdd72b0a5 diff --git a/src/sim_scripts/gazebo_camera_test.py b/src/sim_scripts/gazebo_camera_test.py index 4828b56..6a492a9 100644 --- a/src/sim_scripts/gazebo_camera_test.py +++ b/src/sim_scripts/gazebo_camera_test.py @@ -1,13 +1,12 @@ -from src.modules.imaging.camera import GazeboCamera import os +if __name__ == "__main__": + from src.modules.imaging.camera import GazeboCamera + cam = GazeboCamera() -cam = GazeboCamera() + os.makedirs("tmp/log", exist_ok=True) + dirs = os.listdir("tmp/log") + ft_num = len(dirs) + os.makedirs(f"tmp/log/{ft_num}") # no exist_ok bc. this folder should be new -os.makedirs("tmp/log", exist_ok=True) -dirs = os.listdir("tmp/log") -ft_num = len(dirs) -os.makedirs(f"tmp/log/{ft_num}") # no exist_ok bc. this folder should be new - - -cam.capture_to(f"tmp/log/{ft_num}/gazebocamera.png") + cam.capture_to(f"tmp/log/{ft_num}/gazebocamera.png") diff --git a/test/test_analysis.py b/test/test_analysis.py index 080f1b8..bc050f8 100644 --- a/test/test_analysis.py +++ b/test/test_analysis.py @@ -12,24 +12,22 @@ from src.modules.imaging.camera import DebugCamera from src.modules.imaging.location import DebugLocationProvider from src.modules.imaging.debug import ImageAnalysisDebugger -from dep.labeller.benchmarks.detector import LandingPadDetector, BoundingBox -from dep.labeller.loader.label import Vec2 +from src.modules.imaging.detector import BaseDetector, BoundingBox, Vec2 +detected: Optional[Vec2] = None -class DebugLandingPadDetector(LandingPadDetector): - def __init__(self, - vector: Optional[Vec2] = None, - bb: Optional[BoundingBox] = None): +class DebugDetector(BaseDetector): + def __init__(self, vector: Optional[Vec2] = None, bb: Optional[BoundingBox] = None): self.bounding_box = bb - def predict(self, _image: Image.Image) -> Optional[BoundingBox]: + def predict(self, image: Image.Image) -> Optional[BoundingBox]: return self.bounding_box def test_analysis_subscriber(): camera = DebugCamera("res/test-image.jpeg") - detector = DebugLandingPadDetector() + detector = DebugDetector() location_provider = DebugLocationProvider() location_provider.set_altitude(1.0) analysis = ImageAnalysisDelegate(detector, camera, location_provider) @@ -37,9 +35,12 @@ def test_analysis_subscriber(): global detected detected = None - def _callback(_image, lon, lat): + def _callback(_image, lon_lat): global detected - detected = Vec2(lon, lat) + + detected = None + if lon_lat: + detected = Vec2(lon_lat[0], lon_lat[1]) analysis.subscribe(_callback) @@ -48,20 +49,20 @@ def _callback(_image, lon, lat): assert detected is None detector.bounding_box = BoundingBox(Vec2(20, 20), Vec2(50, 50)) analysis._analyze_image() - assert (detected - - Vec2(-115.48873916832288, 5.483286467459389e-06)).norm < 0.01 + assert detected is not None + result = detected - Vec2(0.4158184416499504, -0.574961758930409) + assert result.norm < 0.01 class MockImageAnlaysisDebugger(ImageAnalysisDebugger): - def __init__(self): - self.image: Optional[Image.Image] = None - self.bounding_box: Optional[BoundingBox] = None + self.image: Image.Image | None = None + self.bounding_box: BoundingBox | None = None self.is_visible = False def show(self): - #if not self.image: - #raise RuntimeError("No image set. Cannot show without an image") + # if not self.image: + # raise RuntimeError("No image set. Cannot show without an image") self.is_visible = True def hide(self): @@ -82,12 +83,11 @@ def set_bounding_box(self, bb: BoundingBox): def test_analysis_debugger(): camera = DebugCamera("res/test-image.jpeg") - detector = DebugLandingPadDetector() + detector = DebugDetector() debug = MockImageAnlaysisDebugger() location_provider = DebugLocationProvider() location_provider.set_altitude(1.0) - analysis = ImageAnalysisDelegate(detector, camera, location_provider, - debug) + analysis = ImageAnalysisDelegate(detector, camera, location_provider, debugger=debug) def run_analysis(): detector.bounding_box = BoundingBox(Vec2(0, 0), Vec2(100, 100)) diff --git a/test/test_battery.py b/test/test_battery.py index a87805e..df79195 100644 --- a/test/test_battery.py +++ b/test/test_battery.py @@ -1,40 +1,37 @@ import threading import time -from dronekit import connect +# from dronekit import connect -from src.modules.autopilot import navigator -from src.modules.autopilot import lander +# from src.modules.autopilot import navigator from src.modules.imaging.mavlink import MAVLinkDelegate from src.modules.imaging.battery import MAVLinkBatteryStatusProvider CONN_STR = "udp:127.0.0.1:14551" MESSENGER_PORT = 14552 -drone = connect(CONN_STR, wait_ready=False) +# drone = connect(CONN_STR, wait_ready=False) -nav = navigator.Navigator(drone, MESSENGER_PORT) -lander = lander.Lander() +# nav = navigator.Navigator(drone, MESSENGER_PORT) -nav.send_status_message("Shepard is online") +# nav.send_status_message("Shepard is online") -mavlink = MAVLinkDelegate() -battery = MAVLinkBatteryStatusProvider(mavlink) +if __name__ == "__main__": + mavlink = MAVLinkDelegate() + battery = MAVLinkBatteryStatusProvider(mavlink) + def wait_for_voltage(): + while True: + try: + voltage = battery.voltage() + print("voltage: ", voltage) + except ValueError: + pass + # print("no data yet") + time.sleep(0.5) -def wait_for_voltage(): - while True: - try: - voltage = battery.voltage() - print("voltage: ", voltage) - except ValueError: - pass - # print("no data yet") - time.sleep(0.5) + # threading.Thread(daemon=True, target=mavlink.run).start() - -threading.Thread(daemon=True, target=mavlink.run).start() - -while True: - nav.send_status_message(nav.sufficient_battery(battery)) - time.sleep(1) + # while True: + # nav.send_status_message(nav.sufficient_battery(battery)) + # time.sleep(1) diff --git a/test/test_camera.py b/test/test_camera.py index 86a1f70..93d4904 100644 --- a/test/test_camera.py +++ b/test/test_camera.py @@ -20,13 +20,13 @@ def test_debug_camera(tmp_path): assert im.height == 400 # Can capture the image as an ndarray - im_np = cam.caputure_as_ndarry() + im_np = cam.capture_as_ndarray() assert im_np.shape == (400, 600, 3) assert im_np.sum() == 121089435 # Stupid simple checksum # Can save the image to a path im_path = tmp_path / "copy.jpeg" - cam.caputure_to(im_path) + cam.capture_to(im_path) im_md5 = md5sum(im_path) # Manually save a copy of 'test-image.jpeg'. diff --git a/test/test_location.py b/test/test_location.py index c21b328..b45e312 100644 --- a/test/test_location.py +++ b/test/test_location.py @@ -1,7 +1,11 @@ import math -from src.modules.imaging.location import (DebugLocationProvider, - MAVLinkLocationProvider, LatLng, - Heading, Rotation) +from src.modules.imaging.location import ( + DebugLocationProvider, + MAVLinkLocationProvider, + LatLng, + Heading, + Rotation, +) from src.modules.imaging.mavlink import MAVLinkDelegateMock from pymavlink.dialects.v20 import all as dialect @@ -10,8 +14,7 @@ def test_get_location(): loc = DebugLocationProvider() loc.debug_change_location(lat=10.0, lng=20.0) - assert loc.location() == LatLng( - 10.0, 20.0), "Location did not match expected value" + assert loc.location() == LatLng(10.0, 20.0), "Location did not match expected value" def test_get_location_partial(): @@ -19,16 +22,14 @@ def test_get_location_partial(): loc.debug_change_location(lat=10.0, lng=20.0) loc.debug_change_location(lat=15.0) # Missing lng - assert loc.location() == LatLng( - 15.0, 20.0), "Location did not match expected value" + assert loc.location() == LatLng(15.0, 20.0), "Location did not match expected value" def test_get_heading(): loc = DebugLocationProvider() loc.debug_change_location(heading=45.0) - assert loc.heading() == Heading( - 45.0), "Heading did not match expected value" + assert loc.heading() == Heading(45.0), "Heading did not match expected value" def test_get_altitude(): @@ -42,8 +43,9 @@ def test_get_orientation(): loc = DebugLocationProvider() loc.debug_change_location(pitch=10.0, roll=20.0, yaw=30.0) - assert loc.orientation() == Rotation( - 10.0, 20.0, 30.0), "Orientation did not match expected value" + assert loc.orientation() == Rotation(10.0, 20.0, 30.0), ( + "Orientation did not match expected value" + ) def test_get_orientation_partial(): @@ -51,8 +53,9 @@ def test_get_orientation_partial(): loc.debug_change_location(pitch=10.0, roll=20.0, yaw=30.0) loc.debug_change_location(pitch=12.0) - assert loc.orientation() == Rotation( - 12.0, 20.0, 30.0), "Orientation did not match expected value" + assert loc.orientation() == Rotation(12.0, 20.0, 30.0), ( + "Orientation did not match expected value" + ) def test_position_changes(): @@ -60,13 +63,15 @@ def test_position_changes(): # Initial change loc.debug_change_location(lat=10.0, lng=20.0) - assert loc.location() == LatLng( - 10.0, 20.0), "Initial location did not match expected value" + assert loc.location() == LatLng(10.0, 20.0), ( + "Initial location did not match expected value" + ) # Change to new location loc.debug_change_location(lat=40.0, lng=50.0) - assert loc.location() == LatLng( - 40.0, 50.0), "New location did not match expected value" + assert loc.location() == LatLng(40.0, 50.0), ( + "New location did not match expected value" + ) def test_get_MAVLink_location(): @@ -83,11 +88,12 @@ def test_get_MAVLink_location(): vx=0, vy=0, vz=0, # Ground X, Y, Z Speed - hdg=0 # Heading + hdg=0, # Heading ) mavlink.send(initial_message) - assert loc_mavlink.location() == LatLng( - 10.0, 20.0), "Initial location did not match expected value" + assert loc_mavlink.location() == LatLng(10.0, 20.0), ( + "Initial location did not match expected value" + ) # Create a mock GLOBAL_POSITION_INT message for the new location new_message = dialect.MAVLink_global_position_int_message( @@ -99,11 +105,12 @@ def test_get_MAVLink_location(): vx=0, vy=0, vz=0, # Ground X, Y, Z Speed - hdg=0 # Heading + hdg=0, # Heading ) mavlink.send(new_message) - assert loc_mavlink.location() == LatLng( - 40.0, 50.0), "New location did not match expected value" + assert loc_mavlink.location() == LatLng(40.0, 50.0), ( + "New location did not match expected value" + ) def test_get_MAVLink_heading(): @@ -116,18 +123,16 @@ def test_get_MAVLink_heading(): lat=0, lon=0, alt=2000, # Altitude in mm above MSL - relative_alt= - 0, # Relative altitude in mm above the ground (not used in this test) + relative_alt=0, # Relative altitude in mm above the ground (not used in this test) vx=0, vy=0, vz=0, # Ground X, Y, Z Speed (not used in this test) - hdg=150 * 1e7 # Heading in degE7 + hdg=150 * 1e7, # Heading in degE7 ) mavlink.send(heading_message) # Check if the MAVLinkLocationProvider reports the heading correctly - assert loc_mavlink.heading() == Heading( - 150), "Heading did not match expected value" + assert loc_mavlink.heading() == Heading(150), "Heading did not match expected value" def test_get_MAVLink_altitude(): @@ -144,13 +149,14 @@ def test_get_MAVLink_altitude(): vx=0, vy=0, vz=0, # Ground X, Y, Z Speed - hdg=0 # Heading + hdg=0, # Heading ) mavlink.send(altitude_message) # Check if the MAVLinkLocationProvider reports the altitude correctly - assert loc_mavlink.altitude( - ) == 15.0, "Altitude did not match expected value in meters" + assert loc_mavlink.altitude() == 15.0, ( + "Altitude did not match expected value in meters" + ) def test_get_MAVLink_orientation(): @@ -165,15 +171,18 @@ def test_get_MAVLink_orientation(): yaw=0.3, # Yaw in radians rollspeed=0, # Roll speed (not used in this test) pitchspeed=0, # Pitch speed (not used in this test) - yawspeed=0 # Yaw speed (not used in this test) + yawspeed=0, # Yaw speed (not used in this test) ) mavlink.send(orientation_message) # Check if the MAVLinkLocationProvider reports the orientation correctly - expected_orientation = Rotation(math.degrees(0.1), math.degrees(0.2), - math.degrees(0.3)) + expected_orientation = Rotation( + math.degrees(0.1), math.degrees(0.2), math.degrees(0.3) + ) actual_orientation = loc_mavlink.orientation() - assert actual_orientation == expected_orientation, "Orientation did not match expected value" + assert actual_orientation == expected_orientation, ( + "Orientation did not match expected value" + ) def test_get_MAVLink_position_changes(): @@ -190,12 +199,14 @@ def test_get_MAVLink_position_changes(): vx=0, vy=0, vz=0, - hdg=0.0) + hdg=0.0, + ) mavlink.send(initial_position_message) # Verify initial position - assert loc_mavlink.location() == LatLng( - 10.0, 20.0), "Initial location did not match expected value" + assert loc_mavlink.location() == LatLng(10.0, 20.0), ( + "Initial location did not match expected value" + ) # Send another mock GLOBAL_POSITION_INT message for new position new_position_message = dialect.MAVLink_global_position_int_message( @@ -207,16 +218,20 @@ def test_get_MAVLink_position_changes(): vx=0, vy=0, vz=0, - hdg=45.0 * 1e7) + hdg=45.0 * 1e7, + ) mavlink.send(new_position_message) # Verify new position - assert loc_mavlink.location() == LatLng( - 40.0, 50.0), "New location did not match expected value" - assert loc_mavlink.heading() == Heading( - 45.0), "New heading did not match expected value" - assert loc_mavlink.altitude( - ) == 2.0, "New altitude did not match expected value in meters" + assert loc_mavlink.location() == LatLng(40.0, 50.0), ( + "New location did not match expected value" + ) + assert loc_mavlink.heading() == Heading(45.0), ( + "New heading did not match expected value" + ) + assert loc_mavlink.altitude() == 2.0, ( + "New altitude did not match expected value in meters" + ) def test_get_MAVLink_position_not_init():