Skip to content

Consumer-producer freezes under heavy load #1025

@Thordijk95

Description

@Thordijk95

I encountered an issue when using the crate.

When I instantiate a producer and a "consumer-producer" (e.g. an instance that passes messages along) where the latter is slower than the former, the consumer-producer gets stuck and is unable to recover.

A consumer that processes incoming messages at half the rate at which they are produced, and needs to forward them as well, breaks down very quickly.

While I do expect that messages are dropped when not handled quickly enough, which is actually what it does when the consumer-producer does is only a consumer, so it does not publish anymore. Resulting in following logs.

foo/bar => hello 2163 world
Received = Outgoing(PubAck(2164))
foo/bar => hello 2164 world
Received = Outgoing(PubAck(2165))
################################# MISSED 409
foo/bar => hello 2573 world
Received = Outgoing(PubAck(2574))
foo/bar => hello 2574 world

Handling the outgoing publish of the consumer- producer on a separate task does not resolve the issue.
Changing the QoS to AtMostOnce does not resolve the issue

Is this intended behavior?

I will provide the scripts I used to demonstrate the behavior.

Consumer-producer:

    use rumqttc::{AsyncClient, Event, MqttOptions, Packet, Publish, QoS};
    use std::thread::sleep;
    use std::time::Duration;
  
    #[tokio::main]
    async fn main() {

    let mqttoptions = MqttOptions::new("rumqtt-consumer-producer", "localhost", 1883);

    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
    client.subscribe("foo/bar", QoS::AtLeastOnce).await.unwrap();

    let mut prev = 0;

    while let Ok(notification) = eventloop.poll().await {
        match notification {
            Event::Incoming(Packet::Publish(Publish {
                dup,
                qos,
                retain,
                topic,
                pkid,
                payload,
            })) => {
                let msg = String::from_utf8(payload.as_ref().to_vec()).unwrap();
                let idx: i64 = msg.split(" ").nth(1).unwrap().parse().unwrap();
                if idx - prev > 1 {
                    println!("################################# MISSED {}", idx - prev);
                }
                prev = idx;
                println!("{topic} => {msg}");
                sleep(Duration::from_millis(2));
                client
                    .publish("/foo/baz", QoS::AtLeastOnce, false, idx.to_string())
                    .await
                    .unwrap();
            }
            other => println!("Received = {other:?}"),
        }
    }
}


Producer:

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::{task, time};

#[tokio::main]
async fn main() {
    let mut mqttoptions = MqttOptions::new("rumqtt-producer", "localhost", 1883);
    mqttoptions.set_clean_session(false);
    mqttoptions.set_keep_alive(Duration::from_secs(5));

    let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 100);

    task::spawn(async move {
        for i in 0.. {
            let msg = format!("hello {i} world");
            println!("{msg}");
            client
                .publish("foo/bar", QoS::AtLeastOnce, false, msg)
                .await
                .unwrap();
            time::sleep(Duration::from_millis(1)).await;
        }
    });

    while let Ok(notification) = eventloop.poll().await {
        // println!("Received = {:?}", notification);
    }
}

"Logs" of consumer-producer:

  Received = Incoming(ConnAck(ConnAck { session_present: false, code: Success }))
  Received = Outgoing(Subscribe(1))
  Received = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtLeastOnce)] }))
  foo/bar => hello 0 world
  Received = Outgoing(PubAck(1))
  Received = Outgoing(Publish(2))
  foo/bar => hello 1 world
  Received = Outgoing(PubAck(2))
  Received = Incoming(PubAck(PubAck { pkid: 2 }))
  Received = Outgoing(Publish(3))
  foo/bar => hello 2 world
  Received = Outgoing(PubAck(3))
  Received = Incoming(PubAck(PubAck { pkid: 3 }))
  foo/bar => hello 3 world
  Received = Outgoing(PubAck(4))
  foo/bar => hello 4 world
  Received = Outgoing(PubAck(5))
  foo/bar => hello 5 world
  Received = Outgoing(PubAck(6))
  foo/bar => hello 6 world
  Received = Outgoing(PubAck(7))
  Received = Outgoing(Publish(4))
  foo/bar => hello 7 world
  Received = Outgoing(PubAck(8))
  Received = Incoming(PubAck(PubAck { pkid: 4 }))
  foo/bar => hello 8 world
  Received = Outgoing(PubAck(9))
  foo/bar => hello 9 world
  Received = Outgoing(PubAck(10))
  foo/bar => hello 10 world
  Received = Outgoing(PubAck(11))
  foo/bar => hello 11 world
  Received = Outgoing(PubAck(12))
  foo/bar => hello 12 world
  Received = Outgoing(PubAck(13))
  foo/bar => hello 13 world

After the process just stops and a minute or so later, the client is disconnected by my local mosquitto broker

      jan 20 15:57:32 NAC41597 mosquitto[734973]: 1768921052: New client connected from 127.0.0.1:47626 as rumqtt-producer (p2, c0, k5).
      jan 20 15:57:33 NAC41597 mosquitto[734973]: 1768921053: Client rumqtt-producer closed its connection.
      jan 20 15:59:04 NAC41597 mosquitto[734973]: 1768921144: Client rumqtt-consumer-producer has exceeded timeout, disconnecting.

The producer was already at message number 500+, sharing those prints does not add much value, I think.

It seems that the event loop prioritizes incoming events from the broker over publishing outgoing events. This leads to a deadlock situation where client.publish() blocks indefinitely because the internal channel is full. A situation which will eventually be reached for any application processing events at a slower rate than produced.
We would like to rely on the brokers ability to drop messages on contention, instead of a specific client reaching a deadlock situation.

Is it possible to influence the way work is prioritized in the event loop?

If there is any more information we can collect, please let us know!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions