Skip to content

Commit 2f48606

Browse files
author
alireza787b
committed
examples: add vision odometry example
Add a practical example showing how to send visual odometry data to PX4 via the Mocap plugin's Odometry message. Demonstrates connecting to PX4 SITL, configuring EKF2 for vision fusion (EKF2_EV_CTRL, EKF2_HGT_REF), sending ODOMETRY at 30 Hz, and monitoring health to verify fusion is active.
1 parent ca52bff commit 2f48606

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

examples/vision_odometry.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
Send visual odometry data to PX4 via MAVSDK's Mocap plugin.
5+
6+
This example demonstrates how to send ODOMETRY messages to PX4 for
7+
vision-based navigation (VIO, visual odometry, motion capture, etc.).
8+
9+
It connects to PX4 SITL, configures EKF2 for external vision fusion,
10+
sends simulated odometry at 30 Hz, and monitors health to verify fusion.
11+
12+
Prerequisites:
13+
- PX4 SITL running (e.g. make px4_sitl gz_x500)
14+
- pip install mavsdk
15+
"""
16+
17+
import asyncio
18+
import math
19+
import time
20+
21+
from mavsdk import System
22+
from mavsdk.mocap import (
23+
Odometry,
24+
PositionBody,
25+
Quaternion,
26+
SpeedBody,
27+
AngularVelocityBody,
28+
Covariance,
29+
)
30+
31+
32+
async def run():
33+
drone = System()
34+
await drone.connect(system_address="udp://:14540")
35+
36+
print("Waiting for drone to connect...")
37+
async for state in drone.core.connection_state():
38+
if state.is_connected:
39+
print("-- Connected to drone!")
40+
break
41+
42+
# Configure EKF2 for vision fusion
43+
print("Configuring EKF2 for vision fusion...")
44+
await drone.param.set_param_int("EKF2_EV_CTRL", 15)
45+
await drone.param.set_param_int("EKF2_HGT_REF", 3)
46+
print("-- EKF2 configured (EV_CTRL=15, HGT_REF=3)")
47+
48+
# Start sending odometry and monitoring health in parallel
49+
await asyncio.gather(
50+
send_odometry(drone),
51+
monitor_health(drone),
52+
)
53+
54+
55+
async def monitor_health(drone):
56+
"""Print health status until vision fusion is active."""
57+
async for health in drone.telemetry.health():
58+
status = "OK" if health.is_local_position_ok else "WAITING"
59+
print(f" Local position: {status}")
60+
if health.is_local_position_ok:
61+
print(" --> EKF2 is fusing vision data!")
62+
break
63+
64+
65+
async def send_odometry(drone):
66+
"""Send simulated vision odometry at 30 Hz for 20 seconds."""
67+
nan_cov = Covariance([float("nan")])
68+
start_time = time.monotonic()
69+
70+
print("Sending vision odometry at 30 Hz...")
71+
for i in range(600):
72+
elapsed = time.monotonic() - start_time
73+
74+
# Simulated position: gentle circle at 0.5 m radius
75+
x = 0.5 * math.cos(elapsed * 0.5)
76+
y = 0.5 * math.sin(elapsed * 0.5)
77+
z = 0.0
78+
79+
# Simulated velocity
80+
vx = -0.25 * math.sin(elapsed * 0.5)
81+
vy = 0.25 * math.cos(elapsed * 0.5)
82+
83+
odom = Odometry(
84+
time_usec=0,
85+
frame_id=Odometry.MavFrame.MOCAP_NED,
86+
position_body=PositionBody(x, y, z),
87+
q=Quaternion(1.0, 0.0, 0.0, 0.0),
88+
speed_body=SpeedBody(vx, vy, 0.0),
89+
angular_velocity_body=AngularVelocityBody(0.0, 0.0, 0.0),
90+
pose_covariance=nan_cov,
91+
velocity_covariance=nan_cov,
92+
)
93+
await drone.mocap.set_odometry(odom)
94+
95+
if i % 90 == 0:
96+
print(f" t={elapsed:.1f}s pos=({x:.2f}, {y:.2f}, {z:.2f})")
97+
98+
await asyncio.sleep(1.0 / 30)
99+
100+
print("-- Done sending odometry")
101+
102+
103+
if __name__ == "__main__":
104+
asyncio.run(run())

0 commit comments

Comments
 (0)