Skip to content

Commit 47bb728

Browse files
snichmespuun
authored andcommitted
Make MQTT max_packet_size configurable
Default to 256 MiB as before. Co-authored-by: Jon Börjesson <[email protected]>
1 parent c2d4af3 commit 47bb728

File tree

4 files changed

+13
-12
lines changed

4 files changed

+13
-12
lines changed

src/lavinmq/config.cr

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ module LavinMQ
7777
property default_user : String = ENV.fetch("LAVINMQ_DEFAULT_USER", "guest")
7878
property default_password : String = ENV.fetch("LAVINMQ_DEFAULT_PASSWORD", DEFAULT_PASSWORD_HASH) # Hashed password for default user
7979
property max_consumers_per_channel = 0
80+
property mqtt_max_packet_size = 268_435_455_u32 # bytes
8081
@@instance : Config = self.new
8182

8283
def self.instance : LavinMQ::Config
@@ -367,6 +368,7 @@ module LavinMQ
367368
when "max_inflight_messages" then @max_inflight_messages = v.to_u16
368369
when "default_vhost" then @default_mqtt_vhost = v
369370
when "permission_check_enabled" then @mqtt_permission_check_enabled = true?(v)
371+
when "max_packet_size" then @mqtt_max_packet_size = v.to_u32
370372
else
371373
STDERR.puts "WARNING: Unrecognized configuration 'mqtt/#{config}'"
372374
end

src/lavinmq/mqtt/broker.cr

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ module LavinMQ
3737
true
3838
end
3939

40-
def add_client(socket, connection_info, user, packet)
40+
def add_client(io, connection_info, user, packet)
4141
if prev_client = @clients[packet.client_id]?
4242
prev_client.close("New client #{connection_info.remote_address} (username=#{packet.username}) connected as #{packet.client_id}")
4343
end
44-
client = MQTT::Client.new(socket,
44+
client = MQTT::Client.new(io,
4545
connection_info,
4646
user,
4747
self,

src/lavinmq/mqtt/client.cr

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ module LavinMQ
2626
@broker.vhost
2727
end
2828

29-
def initialize(@socket : ::IO,
29+
def initialize(@io : MQTT::IO,
3030
@connection_info : ConnectionInfo,
3131
@user : Auth::User,
3232
@broker : MQTT::Broker,
3333
@client_id : String,
3434
@clean_session : Bool = false,
3535
@keepalive : UInt16 = 30,
3636
@will : MQTT::Will? = nil)
37-
@io = MQTT::IO.new(@socket)
3837
@lock = Mutex.new
3938
@waitgroup = WaitGroup.new(1)
4039
@name = "#{@connection_info.remote_address} -> #{@connection_info.local_address}"
@@ -50,7 +49,7 @@ module LavinMQ
5049

5150
private def read_loop
5251
received_bytes = 0_u32
53-
socket = @socket
52+
socket = @io.io
5453
if socket.responds_to?(:"read_timeout=")
5554
# 50% grace period according to [MQTT-3.1.2-24]
5655
socket.read_timeout = @keepalive.zero? ? nil : (@keepalive * 1.5).seconds
@@ -88,7 +87,7 @@ module LavinMQ
8887
end
8988

9089
def read_and_handle_packet
91-
packet : MQTT::Packet = MQTT::Packet.from_io(@io)
90+
packet = @io.read_packet
9291
@log.trace { "Received packet: #{packet.inspect}" }
9392
@recv_oct_count.add(packet.bytesize, :relaxed)
9493

@@ -106,8 +105,8 @@ module LavinMQ
106105

107106
def send(packet)
108107
@lock.synchronize do
109-
packet.to_io(@io)
110-
@socket.flush
108+
@io.write_packet(packet)
109+
@io.flush
111110
@send_oct_count.add(packet.bytesize, :relaxed)
112111
end
113112
case packet
@@ -227,7 +226,7 @@ module LavinMQ
227226
end
228227

229228
private def close_socket
230-
socket = @socket
229+
socket = @io
231230
if socket.responds_to?(:"write_timeout=")
232231
socket.write_timeout = 1.seconds
233232
end

src/lavinmq/mqtt/connection_factory.cr

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ module LavinMQ
2020
metadata = ::Log::Metadata.build({address: connection_info.remote_address.to_s})
2121
logger = Logger.new(Log, metadata)
2222
begin
23-
io = MQTT::IO.new(socket)
24-
if packet = Packet.from_io(socket).as?(Connect)
23+
io = MQTT::IO.new(socket, @config.mqtt_max_packet_size)
24+
if packet = io.read_packet.as?(Connect)
2525
logger.trace { "recv #{packet.inspect}" }
2626
if user_and_broker = authenticate(io, packet)
2727
user, broker = user_and_broker
2828
packet = assign_client_id(packet) if packet.client_id.empty?
2929
session_present = broker.session_present?(packet.client_id, packet.clean_session?)
3030
connack io, session_present, Connack::ReturnCode::Accepted
31-
return broker.add_client(socket, connection_info, user, packet)
31+
return broker.add_client(io, connection_info, user, packet)
3232
else
3333
logger.warn { "Authentication failure for user \"#{packet.username}\"" }
3434
connack io, false, Connack::ReturnCode::NotAuthorized

0 commit comments

Comments
 (0)