Skip to content

Commit 145905e

Browse files
author
alireza787b
committed
examples: add vision odometry example
Add a practical example showing how to send visual odometry data to PX4 via the Mocap plugin's Odometry message. Demonstrates connecting to PX4 SITL, configuring EKF2 for vision fusion (EKF2_EV_CTRL, EKF2_HGT_REF), sending ODOMETRY at 30 Hz, and monitoring health to verify fusion is active.
1 parent 4fab80e commit 145905e

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed

examples/vision_odometry.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
Send visual odometry to PX4 and verify EKF2 fusion.
5+
6+
Demonstrates how to feed external position estimates (from a VIO pipeline,
7+
motion capture system, or visual odometry) to PX4 using the Mocap plugin's
8+
set_odometry() API.
9+
10+
What this example does:
11+
1. Connects to PX4 and configures EKF2 for external vision fusion
12+
2. Sends simulated ODOMETRY messages at 30 Hz (a slow circle)
13+
3. Reads back PX4's position estimate to verify fusion is working
14+
4. Prints both SENT and RECEIVED positions side-by-side
15+
16+
What this example does NOT do:
17+
- It does not arm or fly the drone
18+
- The simulated positions are fake (in a real setup, they come from
19+
a camera/VIO pipeline like T265, ORB-SLAM, OpenCV VO, etc.)
20+
21+
Prerequisites:
22+
- PX4 simulator running (e.g. make px4_sitl gz_x500)
23+
- pip install mavsdk
24+
25+
Usage:
26+
python3 vision_odometry.py
27+
28+
After running, you should see PX4's estimated position converge to match
29+
the sent odometry within a few seconds. This confirms EKF2 is fusing
30+
your external vision data.
31+
"""
32+
33+
import asyncio
34+
import math
35+
import time
36+
37+
from mavsdk import System
38+
from mavsdk.mocap import (
39+
Odometry,
40+
PositionBody,
41+
Quaternion,
42+
SpeedBody,
43+
AngularVelocityBody,
44+
Covariance,
45+
)
46+
47+
# How long to run (seconds) and send rate (Hz)
48+
DURATION_S = 20
49+
SEND_RATE_HZ = 30
50+
51+
52+
async def run():
53+
drone = System()
54+
await drone.connect(system_address="udp://:14540")
55+
56+
print("Waiting for drone to connect...")
57+
async for state in drone.core.connection_state():
58+
if state.is_connected:
59+
print("-- Connected to drone!")
60+
break
61+
62+
# Configure EKF2 to fuse external vision for position + velocity + yaw
63+
print("Configuring EKF2 for vision fusion...")
64+
await drone.param.set_param_int("EKF2_EV_CTRL", 15) # pos_xy + pos_z + vel + yaw
65+
await drone.param.set_param_int("EKF2_HGT_REF", 3) # vision as height reference
66+
print(" EKF2_EV_CTRL = 15 (fuse pos_xy + pos_z + vel + yaw)")
67+
print(" EKF2_HGT_REF = 3 (vision height reference)")
68+
69+
# Run all tasks in parallel
70+
print()
71+
print("Sending odometry and monitoring EKF2 output...")
72+
print("-" * 70)
73+
await asyncio.gather(
74+
send_odometry(drone),
75+
print_ekf2_position(drone),
76+
wait_for_fusion(drone),
77+
)
78+
79+
80+
async def wait_for_fusion(drone):
81+
"""Wait until EKF2 reports local position is OK (fusion active)."""
82+
async for health in drone.telemetry.health():
83+
if health.is_local_position_ok:
84+
print("[HEALTH] EKF2 fusion active -- local position OK!")
85+
return
86+
87+
88+
async def print_ekf2_position(drone):
89+
"""Read back PX4's EKF2 estimated position to verify fusion."""
90+
start = time.monotonic()
91+
async for pos_vel in drone.telemetry.position_velocity_ned():
92+
elapsed = time.monotonic() - start
93+
if elapsed > DURATION_S:
94+
break
95+
n = pos_vel.position.north_m
96+
e = pos_vel.position.east_m
97+
d = pos_vel.position.down_m
98+
print(f" [EKF2 ] t={elapsed:5.1f}s pos=({n:+7.2f}, {e:+7.2f}, {d:+7.2f})")
99+
await asyncio.sleep(3.0)
100+
101+
102+
async def send_odometry(drone):
103+
"""Send simulated vision odometry at SEND_RATE_HZ for DURATION_S."""
104+
nan_cov = Covariance([float("nan")]) # unknown covariance
105+
start_time = time.monotonic()
106+
total_frames = DURATION_S * SEND_RATE_HZ
107+
108+
for i in range(total_frames):
109+
elapsed = time.monotonic() - start_time
110+
111+
# Simulated trajectory: slow circle, 0.5 m radius, ~12.6 s period
112+
x = 0.5 * math.cos(elapsed * 0.5)
113+
y = 0.5 * math.sin(elapsed * 0.5)
114+
z = 0.0 # NED: 0 means same altitude as start
115+
116+
vx = -0.25 * math.sin(elapsed * 0.5)
117+
vy = 0.25 * math.cos(elapsed * 0.5)
118+
119+
odom = Odometry(
120+
time_usec=0, # 0 = let autopilot timestamp it
121+
frame_id=Odometry.MavFrame.MOCAP_NED,
122+
position_body=PositionBody(x, y, z),
123+
q=Quaternion(1.0, 0.0, 0.0, 0.0), # identity = no rotation
124+
speed_body=SpeedBody(vx, vy, 0.0),
125+
angular_velocity_body=AngularVelocityBody(0.0, 0.0, 0.0),
126+
pose_covariance=nan_cov,
127+
velocity_covariance=nan_cov,
128+
)
129+
await drone.mocap.set_odometry(odom)
130+
131+
# Print sent position every 3 seconds
132+
if i % (SEND_RATE_HZ * 3) == 0:
133+
print(f" [SENT ] t={elapsed:5.1f}s pos=({x:+7.2f}, {y:+7.2f}, {z:+7.2f})")
134+
135+
await asyncio.sleep(1.0 / SEND_RATE_HZ)
136+
137+
print()
138+
print(
139+
"-- Done. Sent {} frames over {:.0f}s at {} Hz".format(
140+
total_frames, DURATION_S, SEND_RATE_HZ
141+
)
142+
)
143+
144+
145+
if __name__ == "__main__":
146+
asyncio.run(run())

0 commit comments

Comments
 (0)