@@ -307,3 +307,70 @@ func TestRPCServer_WebSocketEndpointRegistered(t *testing.T) {
307307 require .Nil (t , resp .Error , "health must succeed via RPCServer WebSocket" )
308308 require .NotNil (t , resp .Result )
309309}
310+
311+ // TestRPCServer_OnDisconnectCleansUpSubscriptions verifies that when a
312+ // WebSocket client subscribes to events and then disconnects, the
313+ // OnDisconnect callback fires and removes all subscriptions for that
314+ // client from the EventBus. Without this, disconnected clients leak
315+ // subscriptions.
316+ func TestRPCServer_OnDisconnectCleansUpSubscriptions (t * testing.T ) {
317+ // Use a shared EventBus for both the RPC environment and the server
318+ // so OnDisconnect's UnsubscribeAll targets the same bus that
319+ // Subscribe writes to.
320+ eb := newTestEventBus (t )
321+ rpcCfg := cmtcfg .DefaultRPCConfig ()
322+ core .SetEnvironment (& core.Environment {
323+ Adapter : & adapter.Adapter {EventBus : eb },
324+ Logger : cmtlog .NewNopLogger (),
325+ RPCConfig : * rpcCfg ,
326+ })
327+
328+ l , err := net .Listen ("tcp" , "127.0.0.1:0" )
329+ require .NoError (t , err )
330+ port := l .Addr ().(* net.TCPAddr ).Port
331+ require .NoError (t , l .Close ())
332+
333+ rpcCfg .ListenAddress = fmt .Sprintf ("tcp://127.0.0.1:%d" , port )
334+ srv := NewRPCServer (rpcCfg , log .NewNopLogger (), eb )
335+ require .NoError (t , srv .Start ())
336+ t .Cleanup (func () { _ = srv .Stop () })
337+
338+ require .Eventually (t , func () bool {
339+ c , dialErr := net .DialTimeout ("tcp" , fmt .Sprintf ("127.0.0.1:%d" , port ), 100 * time .Millisecond )
340+ if dialErr != nil {
341+ return false
342+ }
343+ _ = c .Close ()
344+ return true
345+ }, 3 * time .Second , 50 * time .Millisecond )
346+
347+ wsURL := fmt .Sprintf ("ws://127.0.0.1:%d/websocket" , port )
348+ wsConn , _ , err := websocket .DefaultDialer .Dial (wsURL , nil )
349+ require .NoError (t , err )
350+
351+ // Subscribe to an event query.
352+ subscribeReq := `{"jsonrpc":"2.0","id":1,"method":"subscribe","params":{"query":"tm.event='NewBlock'"}}`
353+ require .NoError (t , wsConn .WriteMessage (websocket .TextMessage , []byte (subscribeReq )))
354+
355+ // Read the subscribe response.
356+ require .NoError (t , wsConn .SetReadDeadline (time .Now ().Add (3 * time .Second )))
357+ _ , msg , err := wsConn .ReadMessage ()
358+ require .NoError (t , err )
359+ var subResp wsResponse
360+ require .NoError (t , json .Unmarshal (msg , & subResp ))
361+ require .Nil (t , subResp .Error , "subscribe must succeed, got: %+v" , subResp .Error )
362+
363+ // Verify the EventBus has at least one client subscription.
364+ require .Equal (t , 1 , eb .NumClients (), "expected 1 client on EventBus after subscribe" )
365+
366+ // Close the WebSocket connection to trigger OnDisconnect.
367+ closeMsg := websocket .FormatCloseMessage (websocket .CloseNormalClosure , "" )
368+ require .NoError (t , wsConn .WriteControl (websocket .CloseMessage , closeMsg , time .Now ().Add (time .Second )))
369+ _ = wsConn .Close ()
370+
371+ // OnDisconnect fires asynchronously when the server detects the closed
372+ // connection. Poll until the client count drops to zero.
373+ require .Eventually (t , func () bool {
374+ return eb .NumClients () == 0
375+ }, 3 * time .Second , 50 * time .Millisecond , "OnDisconnect must unsubscribe all client subscriptions" )
376+ }
0 commit comments