Skip to content

Commit 49f935f

Browse files
committed
Use AMQ::Protocol::Stream for frame size limitations
1 parent 47bb728 commit 49f935f

File tree

7 files changed

+88
-86
lines changed

7 files changed

+88
-86
lines changed

shard.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ shards:
66

77
amq-protocol:
88
git: https://github.com/cloudamqp/amq-protocol.cr.git
9-
version: 1.1.16
9+
version: 1.1.19
1010

1111
amqp-client:
1212
git: https://github.com/cloudamqp/amqp-client.cr.git
@@ -18,7 +18,7 @@ shards:
1818

1919
mqtt-protocol:
2020
git: https://github.com/84codes/mqtt-protocol.cr.git
21-
version: 0.2.0+git.commit.5cf504b32313b3b63eecde22e2a136b9b9935b8b
21+
version: 0.3.0
2222

2323
systemd:
2424
git: https://github.com/84codes/systemd.cr.git

spec/connection_spec.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ describe LavinMQ::Server do
143143
ch = conn.channel
144144
frame_max = LavinMQ::Config.instance.frame_max
145145
bytes = frame_max + 1
146-
expect_raises(AMQP::Client::Connection::ClosedException, /frame size #{bytes} exceeded max #{frame_max}/) do
146+
expect_raises(AMQP::Client::Connection::ClosedException, /Frame size #{bytes} exceeds max frame size #{frame_max}/) do
147147
conn.unsafe_write AMQ::Protocol::Frame::Basic::Publish.new(ch.id, 0_u16, "amq.direct", "test", false, false)
148148
conn.unsafe_write AMQ::Protocol::Frame::Header.new(ch.id, 60_u16, 0_u16, bytes.to_u64, AMQ::Protocol::Properties.new)
149149
conn.unsafe_write AMQ::Protocol::Frame::BytesBody.new(ch.id, bytes, Slice.new(bytes.to_i32, 0_u8))

src/lavinmq/amqp/client.cr

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ module LavinMQ
117117

118118
private def read_loop
119119
received_bytes = 0_u32
120-
socket = @socket
120+
stream = AMQ::Protocol::Stream.new(@socket, @max_frame_size)
121121
loop do
122-
AMQP::Frame.from_io(socket) do |frame|
122+
stream.next_frame do |frame|
123123
{% unless flag?(:release) %}
124124
@log.trace { "Received #{frame.inspect}" }
125125
{% end %}
@@ -155,6 +155,10 @@ module LavinMQ
155155
end
156156
rescue IO::TimeoutError
157157
send_heartbeat || break
158+
rescue ex : AMQ::Protocol::Error::FrameSizeError
159+
@log.error { ex.inspect }
160+
send_frame_error(ex.message)
161+
break
158162
rescue ex : AMQ::Protocol::Error::NotImplemented
159163
@log.error { ex.inspect }
160164
send_not_implemented(ex)

src/lavinmq/amqp/connection_factory.cr

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,19 @@ module LavinMQ
2020
metadata = ::Log::Metadata.build({address: connection_info.remote_address.to_s})
2121
logger = Logger.new(Log, metadata)
2222
if confirm_header(socket, logger)
23-
if start_ok = start(socket, logger)
24-
if user = authenticate(socket, connection_info.remote_address, start_ok, logger)
25-
if tune_ok = tune(socket, logger)
26-
if vhost = open(socket, user, logger)
23+
stream = AMQ::Protocol::Stream.new(socket)
24+
if start_ok = start(stream, logger)
25+
if user = authenticate(stream, connection_info.remote_address, start_ok, logger)
26+
if tune_ok = tune(stream, logger)
27+
if vhost = open(stream, user, logger)
2728
socket.read_timeout = heartbeat_timeout(tune_ok)
2829
return LavinMQ::AMQP::Client.new(socket, connection_info, vhost, user, tune_ok, start_ok)
2930
end
3031
end
3132
end
3233
end
3334
end
34-
rescue ex : IO::TimeoutError | IO::Error | OpenSSL::SSL::Error | AMQ::Protocol::Error::FrameDecode
35+
rescue ex : IO::TimeoutError | IO::Error | OpenSSL::SSL::Error | AMQ::Protocol::Error
3536
Log.warn { "#{ex} when #{connection_info.remote_address} tried to establish connection" }
3637
nil
3738
rescue ex
@@ -81,7 +82,7 @@ module LavinMQ
8182
start = AMQP::Frame::Connection::Start.new(server_properties: SERVER_PROPERTIES)
8283
socket.write_bytes start, ::IO::ByteFormat::NetworkEndian
8384
socket.flush
84-
start_ok = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::StartOk) }
85+
start_ok = socket.next_frame.as(AMQP::Frame::Connection::StartOk)
8586
if start_ok.bytesize > 4096
8687
log.warn { "StartOk frame was #{start_ok.bytesize} bytes, max allowed is 4096 bytes" }
8788
return
@@ -137,35 +138,33 @@ module LavinMQ
137138
frame_max: frame_max,
138139
heartbeat: Config.instance.heartbeat), IO::ByteFormat::NetworkEndian
139140
socket.flush
140-
tune_ok = AMQP::Frame.from_io(socket) do |frame|
141-
case frame
142-
when AMQP::Frame::Connection::TuneOk
143-
if LOW_FRAME_MAX_RANGE.includes? frame.frame_max
144-
log.warn { "Suggested Frame max (#{frame.frame_max}) too low, closing connection" }
145-
reply_text = "failed to negotiate connection parameters: negotiated frame_max = #{frame.frame_max} is lower than the minimum allowed value (#{MIN_FRAME_MAX})"
146-
return close_connection(socket, ConnectionReplyCode::NOT_ALLOWED, reply_text, frame)
147-
end
148-
if too_high?(frame.frame_max, Config.instance.frame_max)
149-
log.warn { "Suggested Frame max (#{frame.frame_max}) too high, closing connection" }
150-
reply_text = "failed to negotiate connection parameters: negotiated frame_max = #{frame.frame_max} is higher than the maximum allowed value (#{Config.instance.frame_max})"
151-
return close_connection(socket, ConnectionReplyCode::NOT_ALLOWED, reply_text, frame)
152-
end
153-
if too_high?(frame.channel_max, Config.instance.channel_max)
154-
log.warn { "Suggested Channel max (#{frame.channel_max}) too high, closing connection" }
155-
reply_text = "failed to negotiate connection parameters: negotiated channel_max = #{frame.channel_max} is higher than the maximum allowed value (#{Config.instance.channel_max})"
156-
return close_connection(socket, ConnectionReplyCode::NOT_ALLOWED, reply_text, frame)
157-
end
158-
frame
159-
else
160-
log.warn { "Expected TuneOk Frame got #{frame.inspect}" }
161-
return
141+
frame = socket.next_frame
142+
case frame
143+
when AMQP::Frame::Connection::TuneOk
144+
if LOW_FRAME_MAX_RANGE.includes? frame.frame_max
145+
log.warn { "Suggested Frame max (#{frame.frame_max}) too low, closing connection" }
146+
reply_text = "failed to negotiate connection parameters: negotiated frame_max = #{frame.frame_max} is lower than the minimum allowed value (#{MIN_FRAME_MAX})"
147+
return close_connection(socket, ConnectionReplyCode::NOT_ALLOWED, reply_text, frame)
162148
end
149+
if too_high?(frame.frame_max, Config.instance.frame_max)
150+
log.warn { "Suggested Frame max (#{frame.frame_max}) too high, closing connection" }
151+
reply_text = "failed to negotiate connection parameters: negotiated frame_max = #{frame.frame_max} is higher than the maximum allowed value (#{Config.instance.frame_max})"
152+
return close_connection(socket, ConnectionReplyCode::NOT_ALLOWED, reply_text, frame)
153+
end
154+
if too_high?(frame.channel_max, Config.instance.channel_max)
155+
log.warn { "Suggested Channel max (#{frame.channel_max}) too high, closing connection" }
156+
reply_text = "failed to negotiate connection parameters: negotiated channel_max = #{frame.channel_max} is higher than the maximum allowed value (#{Config.instance.channel_max})"
157+
return close_connection(socket, ConnectionReplyCode::NOT_ALLOWED, reply_text, frame)
158+
end
159+
frame
160+
else
161+
log.warn { "Expected TuneOk Frame got #{frame.inspect}" }
162+
return
163163
end
164-
tune_ok
165164
end
166165

167166
def open(socket, user, log)
168-
open = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::Open) }
167+
open = socket.next_frame.as(AMQP::Frame::Connection::Open)
169168
vhost_name = open.vhost.empty? ? "/" : open.vhost
170169
if vhost = @vhosts[vhost_name]?
171170
if user.permissions[vhost_name]?

src/lavinmq/definitions_generator.cr

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,9 @@ class LavinMQCtl
187187
queues = Array(Frame::Method::Queue::Declare).new
188188
File.open(File.join(vhost_dir, "definitions.amqp")) do |defs|
189189
_schema = defs.read_bytes Int32
190+
stream = AMQ::Protocol::Stream.new(defs, format: IO::ByteFormat::SystemEndian)
190191
loop do
191-
frame = Frame.from_io(defs, IO::ByteFormat::SystemEndian) { |f| f }
192-
case frame
192+
case frame = stream.next_frame
193193
when Frame::Method::Queue::Declare
194194
queues << frame
195195
when Frame::Method::Queue::Delete
@@ -206,9 +206,9 @@ class LavinMQCtl
206206
exchanges = Array(Frame::Method::Exchange::Declare).new
207207
File.open(File.join(vhost_dir, "definitions.amqp")) do |defs|
208208
_schema = defs.read_bytes Int32
209+
stream = AMQ::Protocol::Stream.new(defs, format: IO::ByteFormat::SystemEndian)
209210
loop do
210-
frame = Frame.from_io(defs, IO::ByteFormat::SystemEndian) { |f| f }
211-
case frame
211+
case frame = stream.next_frame
212212
when Frame::Method::Exchange::Declare
213213
exchanges << frame
214214
when Frame::Method::Exchange::Delete
@@ -225,9 +225,9 @@ class LavinMQCtl
225225
bindings = Array(Frame::Method::Queue::Bind).new
226226
File.open(File.join(vhost_dir, "definitions.amqp")) do |defs|
227227
_schema = defs.read_bytes Int32
228+
stream = AMQ::Protocol::Stream.new(defs, format: IO::ByteFormat::SystemEndian)
228229
loop do
229-
frame = Frame.from_io(defs, IO::ByteFormat::SystemEndian) { |f| f }
230-
case frame
230+
case frame = stream.next_frame
231231
when Frame::Method::Queue::Bind
232232
bindings << frame
233233
when Frame::Method::Queue::Unbind
@@ -244,9 +244,9 @@ class LavinMQCtl
244244
bindings = Array(Frame::Method::Exchange::Bind).new
245245
File.open(File.join(vhost_dir, "definitions.amqp")) do |defs|
246246
_schema = defs.read_bytes Int32
247+
stream = AMQ::Protocol::Stream.new(defs, format: IO::ByteFormat::SystemEndian)
247248
loop do
248-
frame = Frame.from_io(defs, IO::ByteFormat::SystemEndian) { |f| f }
249-
case frame
249+
case frame = stream.next_frame
250250
when Frame::Method::Exchange::Bind
251251
bindings << frame
252252
when Frame::Method::Exchange::Unbind

src/lavinmq/schema.cr

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,13 @@ module LavinMQ
109109
version = io.read_bytes Int32
110110
raise UnsupportedSchemaVersion.new(version, io.path) unless version == 1
111111

112+
stream = AMQ::Protocol::Stream.new(io, format: IO::ByteFormat::SystemEndian)
112113
loop do
113-
AMQP::Frame.from_io(io, IO::ByteFormat::SystemEndian) do |frame|
114-
case frame
115-
when AMQP::Frame::Queue::Declare
116-
queues.push frame.queue_name
117-
when AMQP::Frame::Queue::Delete
118-
queues.delete frame.queue_name
119-
end
114+
case frame = stream.next_frame
115+
when AMQP::Frame::Queue::Declare
116+
queues.push frame.queue_name
117+
when AMQP::Frame::Queue::Delete
118+
queues.delete frame.queue_name
120119
end
121120
rescue IO::EOFError
122121
break

src/lavinmq/vhost.cr

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -495,44 +495,44 @@ module LavinMQ
495495
@definitions_lock.synchronize do
496496
@log.debug { "Verifying schema" }
497497
SchemaVersion.verify(io, :definition)
498+
stream = AMQ::Protocol::Stream.new(io, format: IO::ByteFormat::SystemEndian)
498499
loop do
499-
AMQP::Frame.from_io(io, IO::ByteFormat::SystemEndian) do |f|
500-
@log.trace { "Reading frame #{f.inspect}" }
501-
case f
502-
when AMQP::Frame::Exchange::Declare
503-
exchanges[f.exchange_name] = f
504-
when AMQP::Frame::Exchange::Delete
505-
exchanges.delete f.exchange_name
506-
exchange_bindings.delete f.exchange_name
507-
should_compact = true
508-
when AMQP::Frame::Exchange::Bind
509-
exchange_bindings[f.destination] << f
510-
when AMQP::Frame::Exchange::Unbind
511-
exchange_bindings[f.destination].reject! do |b|
512-
b.source == f.source &&
513-
b.routing_key == f.routing_key &&
514-
b.arguments == f.arguments
515-
end
516-
should_compact = true
517-
when AMQP::Frame::Queue::Declare
518-
queues[f.queue_name] = f
519-
when AMQP::Frame::Queue::Delete
520-
queues.delete f.queue_name
521-
queue_bindings.delete f.queue_name
522-
should_compact = true
523-
when AMQP::Frame::Queue::Bind
524-
queue_bindings[f.queue_name] << f
525-
when AMQP::Frame::Queue::Unbind
526-
queue_bindings[f.queue_name].reject! do |b|
527-
b.exchange_name == f.exchange_name &&
528-
b.routing_key == f.routing_key &&
529-
b.arguments == f.arguments
530-
end
531-
should_compact = true
532-
else
533-
raise "Cannot apply frame #{f.class} in vhost #{@name}"
500+
f = stream.next_frame
501+
@log.trace { "Reading frame #{f.inspect}" }
502+
case f
503+
when AMQP::Frame::Exchange::Declare
504+
exchanges[f.exchange_name] = f
505+
when AMQP::Frame::Exchange::Delete
506+
exchanges.delete f.exchange_name
507+
exchange_bindings.delete f.exchange_name
508+
should_compact = true
509+
when AMQP::Frame::Exchange::Bind
510+
exchange_bindings[f.destination] << f
511+
when AMQP::Frame::Exchange::Unbind
512+
exchange_bindings[f.destination].reject! do |b|
513+
b.source == f.source &&
514+
b.routing_key == f.routing_key &&
515+
b.arguments == f.arguments
534516
end
535-
end # Frame.from_io
517+
should_compact = true
518+
when AMQP::Frame::Queue::Declare
519+
queues[f.queue_name] = f
520+
when AMQP::Frame::Queue::Delete
521+
queues.delete f.queue_name
522+
queue_bindings.delete f.queue_name
523+
should_compact = true
524+
when AMQP::Frame::Queue::Bind
525+
queue_bindings[f.queue_name] << f
526+
when AMQP::Frame::Queue::Unbind
527+
queue_bindings[f.queue_name].reject! do |b|
528+
b.exchange_name == f.exchange_name &&
529+
b.routing_key == f.routing_key &&
530+
b.arguments == f.arguments
531+
end
532+
should_compact = true
533+
else
534+
raise "Cannot apply frame #{f.class} in vhost #{@name}"
535+
end
536536
rescue ex : IO::EOFError
537537
break
538538
end # loop

0 commit comments

Comments
 (0)