Skip to content

Commit 08514ff

Browse files
carlhoerbergclaude
andauthored
Fix segfault race condition when closing auto-delete queues (#1529)
## Summary - Use WaitGroup to track active deliver loop fibers and wait for them to complete before closing the message store - Prevents race condition where MFile could be unmapped while a consumer was still delivering a message (causing segfault in `properties.cr:224` during `to_io()`) - Reorder Consumer#close to close notification channels before calling rm_consumer to prevent deadlock during auto_delete queue cleanup ## Background When a heartbeat timeout triggers client cleanup on an auto-delete queue, the following race can occur: 1. Consumer deliver loop is writing message headers to socket (which yields) 2. Client cleanup triggers queue auto-delete → queue close → message store close → MFile unmap 3. Consumer deliver loop resumes and tries to continue writing from unmapped memory → SEGFAULT The fix uses Crystal's WaitGroup to ensure all deliver loops have exited before the message store is closed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude <[email protected]>
1 parent 75bcc5e commit 08514ff

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

src/lavinmq/amqp/consumer.cr

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,18 @@ module LavinMQ
3131
@flow = @channel.flow?
3232
@metadata = @channel.@metadata.extend({consumer: @tag})
3333
@log = Logger.new(Log, @metadata)
34-
spawn deliver_loop, name: "Consumer deliver loop"
34+
fiber_name = "Consumer vhost=#{@queue.vhost.name} queue=#{@queue.name}"
35+
@queue.deliver_loop_wg.spawn(name: fiber_name) { deliver_loop }
3536
@flow_change = BoolChannel.new(@flow)
3637
@has_capacity = BoolChannel.new(true)
3738
end
3839

3940
def close
4041
@closed = true
41-
@queue.rm_consumer(self)
4242
@notify_closed.close
4343
@has_capacity.close
4444
@flow_change.close
45+
@queue.rm_consumer(self)
4546
end
4647

4748
@notify_closed = ::Channel(Nil).new

src/lavinmq/amqp/queue/queue.cr

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ module LavinMQ::AMQP
9090

9191
@msg_store_lock = Mutex.new(:reentrant)
9292
@msg_store : MessageStore
93+
getter deliver_loop_wg = WaitGroup.new
9394

9495
getter paused = BoolChannel.new(false)
9596

@@ -414,7 +415,8 @@ module LavinMQ::AMQP
414415
@consumers.each &.cancel
415416
@consumers.clear
416417
end
417-
Fiber.yield # Allow all consumers to cancel before closing mmap:s
418+
Fiber.yield # Let deliver_loop fibers start and react to closed channels
419+
@deliver_loop_wg.wait # Wait for all deliver loops to exit before closing mmap:s
418420
@msg_store_lock.synchronize do
419421
@msg_store.close
420422
end

0 commit comments

Comments
 (0)