Description
When publishing messages with aio-pika that contain integer headers, a Kombu consumer fails to parse them.
The consumer crashes before the handler is invoked.
Reproduction:
import asyncio
import threading
import aio_pika
from kombu import Connection, Consumer, Exchange, Queue
RABBIT_URL = "amqp://guest:guest@127.0.0.1:5672/"
QUEUE_NAME = "test_queue"
EXCHANGE_NAME = "test_exchange"
ROUTING_KEY = "test_key"
HEADER_VALUE: int = 128
def kombu_consumer():
def process_message(body, message):
print("Kombu received:", body)
print("Kombu received header:", message.headers)
message.ack()
with Connection(RABBIT_URL) as conn:
exchange = Exchange(EXCHANGE_NAME, type="direct", durable=False)
queue = Queue(QUEUE_NAME, exchange=exchange, routing_key=ROUTING_KEY, durable=False)
with Consumer(conn, queues=[queue], callbacks=[process_message], accept=["json"]):
print("Kombu consumer started, waiting for messages...")
try:
while True:
conn.drain_events()
except KeyboardInterrupt:
pass
async def aio_pika_publisher():
connection = await aio_pika.connect_robust(RABBIT_URL)
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
EXCHANGE_NAME, aio_pika.ExchangeType.DIRECT, durable=False
)
queue = await channel.declare_queue(QUEUE_NAME, durable=False)
await queue.bind(exchange, ROUTING_KEY)
message = aio_pika.Message(
body=b'{"hello": "world"}',
headers={"key": HEADER_VALUE},
content_type="application/json",
)
print("aio-pika publishing...")
await exchange.publish(message, routing_key=ROUTING_KEY)
print("aio-pika published message")
if __name__ == "__main__":
# Run Kombu consumer in a thread
consumer_thread = threading.Thread(target=kombu_consumer, daemon=True)
consumer_thread.start()
# Give Kombu a moment to connect
asyncio.run(asyncio.sleep(1))
# Run aio-pika publisher
asyncio.run(aio_pika_publisher())
consumer_thread.join()
Failing Ranges
The failure occurs only for specific integer values in headers:
128–511
640–1023
1152–1535
1664–2047
2176–2559
2688–3072
3200–3583
3712–4095
And more...
Example Errors
IndexError: index out of range
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte
UnexpectedFrame: Received frame 3 while expecting type: 2
Expected
Either integer headers should be supported, or a consistent, clear error should be raised instead of crashes.
I am also opening an issue in aio-pika, in case it is something on their end.
Thanks!
Description
When publishing messages with aio-pika that contain integer headers, a Kombu consumer fails to parse them.
The consumer crashes before the handler is invoked.
Reproduction:
Failing Ranges
The failure occurs only for specific integer values in headers:
Example Errors
Expected
Either integer headers should be supported, or a consistent, clear error should be raised instead of crashes.
I am also opening an issue in aio-pika, in case it is something on their end.
Thanks!