I encountered an issue when using the crate.
When I instantiate a producer and a "consumer-producer" (e.g. an instance that passes messages along) where the latter is slower than the former, the consumer-producer gets stuck and is unable to recover.
A consumer that processes incoming messages at half the rate at which they are produced, and needs to forward them as well, breaks down very quickly.
While I do expect that messages are dropped when not handled quickly enough, which is actually what it does when the consumer-producer does is only a consumer, so it does not publish anymore. Resulting in following logs.
foo/bar => hello 2163 world
Received = Outgoing(PubAck(2164))
foo/bar => hello 2164 world
Received = Outgoing(PubAck(2165))
################################# MISSED 409
foo/bar => hello 2573 world
Received = Outgoing(PubAck(2574))
foo/bar => hello 2574 world
Handling the outgoing publish of the consumer- producer on a separate task does not resolve the issue.
Changing the QoS to AtMostOnce does not resolve the issue
Is this intended behavior?
I will provide the scripts I used to demonstrate the behavior.
Consumer-producer:
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, Publish, QoS};
use std::thread::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
let mqttoptions = MqttOptions::new("rumqtt-consumer-producer", "localhost", 1883);
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("foo/bar", QoS::AtLeastOnce).await.unwrap();
let mut prev = 0;
while let Ok(notification) = eventloop.poll().await {
match notification {
Event::Incoming(Packet::Publish(Publish {
dup,
qos,
retain,
topic,
pkid,
payload,
})) => {
let msg = String::from_utf8(payload.as_ref().to_vec()).unwrap();
let idx: i64 = msg.split(" ").nth(1).unwrap().parse().unwrap();
if idx - prev > 1 {
println!("################################# MISSED {}", idx - prev);
}
prev = idx;
println!("{topic} => {msg}");
sleep(Duration::from_millis(2));
client
.publish("/foo/baz", QoS::AtLeastOnce, false, idx.to_string())
.await
.unwrap();
}
other => println!("Received = {other:?}"),
}
}
}
Producer:
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::{task, time};
#[tokio::main]
async fn main() {
let mut mqttoptions = MqttOptions::new("rumqtt-producer", "localhost", 1883);
mqttoptions.set_clean_session(false);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
task::spawn(async move {
for i in 0.. {
let msg = format!("hello {i} world");
println!("{msg}");
client
.publish("foo/bar", QoS::AtLeastOnce, false, msg)
.await
.unwrap();
time::sleep(Duration::from_millis(1)).await;
}
});
while let Ok(notification) = eventloop.poll().await {
// println!("Received = {:?}", notification);
}
}
"Logs" of consumer-producer:
Received = Incoming(ConnAck(ConnAck { session_present: false, code: Success }))
Received = Outgoing(Subscribe(1))
Received = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtLeastOnce)] }))
foo/bar => hello 0 world
Received = Outgoing(PubAck(1))
Received = Outgoing(Publish(2))
foo/bar => hello 1 world
Received = Outgoing(PubAck(2))
Received = Incoming(PubAck(PubAck { pkid: 2 }))
Received = Outgoing(Publish(3))
foo/bar => hello 2 world
Received = Outgoing(PubAck(3))
Received = Incoming(PubAck(PubAck { pkid: 3 }))
foo/bar => hello 3 world
Received = Outgoing(PubAck(4))
foo/bar => hello 4 world
Received = Outgoing(PubAck(5))
foo/bar => hello 5 world
Received = Outgoing(PubAck(6))
foo/bar => hello 6 world
Received = Outgoing(PubAck(7))
Received = Outgoing(Publish(4))
foo/bar => hello 7 world
Received = Outgoing(PubAck(8))
Received = Incoming(PubAck(PubAck { pkid: 4 }))
foo/bar => hello 8 world
Received = Outgoing(PubAck(9))
foo/bar => hello 9 world
Received = Outgoing(PubAck(10))
foo/bar => hello 10 world
Received = Outgoing(PubAck(11))
foo/bar => hello 11 world
Received = Outgoing(PubAck(12))
foo/bar => hello 12 world
Received = Outgoing(PubAck(13))
foo/bar => hello 13 world
After the process just stops and a minute or so later, the client is disconnected by my local mosquitto broker
jan 20 15:57:32 NAC41597 mosquitto[734973]: 1768921052: New client connected from 127.0.0.1:47626 as rumqtt-producer (p2, c0, k5).
jan 20 15:57:33 NAC41597 mosquitto[734973]: 1768921053: Client rumqtt-producer closed its connection.
jan 20 15:59:04 NAC41597 mosquitto[734973]: 1768921144: Client rumqtt-consumer-producer has exceeded timeout, disconnecting.
The producer was already at message number 500+, sharing those prints does not add much value, I think.
It seems that the event loop prioritizes incoming events from the broker over publishing outgoing events. This leads to a deadlock situation where client.publish() blocks indefinitely because the internal channel is full. A situation which will eventually be reached for any application processing events at a slower rate than produced.
We would like to rely on the brokers ability to drop messages on contention, instead of a specific client reaching a deadlock situation.
Is it possible to influence the way work is prioritized in the event loop?
If there is any more information we can collect, please let us know!
I encountered an issue when using the crate.
When I instantiate a producer and a "consumer-producer" (e.g. an instance that passes messages along) where the latter is slower than the former, the consumer-producer gets stuck and is unable to recover.
A consumer that processes incoming messages at half the rate at which they are produced, and needs to forward them as well, breaks down very quickly.
While I do expect that messages are dropped when not handled quickly enough, which is actually what it does when the consumer-producer does is only a consumer, so it does not publish anymore. Resulting in following logs.
Handling the outgoing publish of the consumer- producer on a separate task does not resolve the issue.
Changing the QoS to AtMostOnce does not resolve the issue
Is this intended behavior?
I will provide the scripts I used to demonstrate the behavior.
Consumer-producer:
Producer:
"Logs" of consumer-producer:
After the process just stops and a minute or so later, the client is disconnected by my local mosquitto broker
The producer was already at message number 500+, sharing those prints does not add much value, I think.
It seems that the event loop prioritizes incoming events from the broker over publishing outgoing events. This leads to a deadlock situation where client.publish() blocks indefinitely because the internal channel is full. A situation which will eventually be reached for any application processing events at a slower rate than produced.
We would like to rely on the brokers ability to drop messages on contention, instead of a specific client reaching a deadlock situation.
Is it possible to influence the way work is prioritized in the event loop?
If there is any more information we can collect, please let us know!