Skip to content

Bug: Kombu consumer fails when aio-pika publisher sends integer headers #2354

@lucius0958725

Description

@lucius0958725

Description

When publishing messages with aio-pika that contain integer headers, a Kombu consumer fails to parse them.
The consumer crashes before the handler is invoked.

Reproduction:

import asyncio
import threading

import aio_pika
from kombu import Connection, Consumer, Exchange, Queue

RABBIT_URL = "amqp://guest:guest@127.0.0.1:5672/"
QUEUE_NAME = "test_queue"
EXCHANGE_NAME = "test_exchange"
ROUTING_KEY = "test_key"

HEADER_VALUE: int = 128


def kombu_consumer():
    def process_message(body, message):
        print("Kombu received:", body)
        print("Kombu received header:", message.headers)
        message.ack()

    with Connection(RABBIT_URL) as conn:
        exchange = Exchange(EXCHANGE_NAME, type="direct", durable=False)
        queue = Queue(QUEUE_NAME, exchange=exchange, routing_key=ROUTING_KEY, durable=False)

        with Consumer(conn, queues=[queue], callbacks=[process_message], accept=["json"]):
            print("Kombu consumer started, waiting for messages...")
            try:
                while True:
                    conn.drain_events()
            except KeyboardInterrupt:
                pass


async def aio_pika_publisher():
    connection = await aio_pika.connect_robust(RABBIT_URL)
    async with connection:
        channel = await connection.channel()
        exchange = await channel.declare_exchange(
            EXCHANGE_NAME, aio_pika.ExchangeType.DIRECT, durable=False
        )
        queue = await channel.declare_queue(QUEUE_NAME, durable=False)
        await queue.bind(exchange, ROUTING_KEY)

        message = aio_pika.Message(
            body=b'{"hello": "world"}',
            headers={"key": HEADER_VALUE},
            content_type="application/json",
        )
        print("aio-pika publishing...")
        await exchange.publish(message, routing_key=ROUTING_KEY)
        print("aio-pika published message")


if __name__ == "__main__":
    # Run Kombu consumer in a thread
    consumer_thread = threading.Thread(target=kombu_consumer, daemon=True)
    consumer_thread.start()

    # Give Kombu a moment to connect
    asyncio.run(asyncio.sleep(1))

    # Run aio-pika publisher
    asyncio.run(aio_pika_publisher())

    consumer_thread.join()

Failing Ranges

The failure occurs only for specific integer values in headers:

128–511
640–1023
1152–1535
1664–2047
2176–2559
2688–3072
3200–3583
3712–4095
And more...

Example Errors

IndexError: index out of range

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte

UnexpectedFrame: Received frame 3 while expecting type: 2

Expected

Either integer headers should be supported, or a consistent, clear error should be raised instead of crashes.

I am also opening an issue in aio-pika, in case it is something on their end.
Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions