@@ -31,8 +31,9 @@ public class SSENotificationV2ServiceImpl implements SSENotificationV2Service, M
3131 private final RedisTemplate <String , Object > redisTemplate ;
3232 private final RedisMessageListenerContainer redisMessageListenerContainer ;
3333 private final ObjectMapper objectMapper ;
34- private final Map <UUID , SseEmitter > connections = new ConcurrentHashMap <>();
35- private final Map <UUID , ScheduledFuture <?>> heartbeatTasks = new ConcurrentHashMap <>();
34+
35+ private final Map <UUID , ConcurrentHashMap <String , SseEmitter >> connections = new ConcurrentHashMap <>();
36+ private final Map <UUID , ConcurrentHashMap <String , ScheduledFuture <?>>> heartbeatTasks = new ConcurrentHashMap <>();
3637 private final Map <UUID , Object > userLocks = new ConcurrentHashMap <>();
3738 private final ScheduledExecutorService heartbeatExecutor ;
3839 private static final String NOTIFICATION_CHANNEL = "sse-notifications" ;
@@ -62,61 +63,41 @@ private void init() {
6263 }
6364
6465 @ Override
65- public SseEmitter createConnection (UUID userId ) {
66+ public SseEmitter createConnection (UUID userId , String connectionId ) {
6667 Object lock = userLocks .computeIfAbsent (userId , id -> new Object ());
6768 synchronized (lock ) {
68- cleanupUserState (userId );
69+ connections .computeIfAbsent (userId , id -> new ConcurrentHashMap <>());
70+ heartbeatTasks .computeIfAbsent (userId , id -> new ConcurrentHashMap <>());
6971
7072 SseEmitter emitter = new SseEmitter (3 * 60 * 1000L );
71- connections .put (userId , emitter );
72-
73- emitter .onCompletion (() -> {
74- Object cbLock = userLocks .get (userId );
75- if (cbLock != null ) {
76- synchronized (cbLock ) {
77- if (connections .get (userId ) == emitter ) {
78- cleanupUserState (userId );
79- onlineStatusService .setUserOffline (userId );
80- }
81- }
82- }
83- });
73+ connections .get (userId ).put (connectionId , emitter );
74+
75+ emitter .onCompletion (() -> cleanupConnection (userId , connectionId , emitter ));
8476 emitter .onTimeout (() -> {
85- log .debug ("SSE 연결 타임아웃: userId={}" , userId );
86- Object cbLock = userLocks .get (userId );
87- if (cbLock != null ) {
88- synchronized (cbLock ) {
89- if (connections .get (userId ) == emitter ) {
90- cleanupUserState (userId );
91- onlineStatusService .setUserOffline (userId );
92- }
93- }
94- }
77+ log .debug ("SSE 연결 타임아웃: userId={}, connectionId={}" , userId , connectionId );
78+ cleanupConnection (userId , connectionId , emitter );
9579 });
9680 emitter .onError (e -> {
97- log .warn ("SSE 연결 오류: userId={}, error={}" , userId , e .getMessage ());
98- Object cbLock = userLocks .get (userId );
99- if (cbLock != null ) {
100- synchronized (cbLock ) {
101- if (connections .get (userId ) == emitter ) {
102- cleanupUserState (userId );
103- onlineStatusService .setUserOffline (userId );
104- }
105- }
106- }
81+ log .warn ("SSE 연결 오류: userId={}, connectionId={}, error={}" , userId , connectionId , e .getMessage ());
82+ cleanupConnection (userId , connectionId , emitter );
10783 });
10884
10985 try {
11086 emitter .send (SseEmitter .event ()
11187 .name ("connected" )
112- .data (Map .of ("status" , "connected" , "timestamp" , LocalDateTime .now ())));
113- scheduleHeartbeat (userId , emitter );
88+ .data (Map .of ("status" , "connected" , "connectionId" , connectionId , " timestamp" , LocalDateTime .now ())));
89+ scheduleHeartbeat (userId , connectionId , emitter );
11490 } catch (IOException e ) {
115- cleanupUserState (userId );
91+ cleanupSingleConnection (userId , connectionId );
11692 throw new CustomException (ErrorCode .NOTIFICATION_SSE_FIRST_MESSAGE_FAILED );
11793 }
11894
119- onlineStatusService .setUserOnline (userId );
95+ if (connections .get (userId ).size () == 1 ) {
96+ onlineStatusService .setUserOnline (userId );
97+ }
98+
99+ log .debug ("SSE 연결 추가: userId={}, connectionId={}, 총 연결 수={}" ,
100+ userId , connectionId , connections .get (userId ).size ());
120101 return emitter ;
121102 }
122103 }
@@ -129,24 +110,89 @@ public void disconnectUser(UUID userId) {
129110 return ;
130111 }
131112 synchronized (lock ) {
132- cleanupUserState (userId );
113+ cleanupAllConnections (userId );
133114 onlineStatusService .setUserOffline (userId );
134115 }
135116 }
136117
137- private void cleanupUserState (UUID userId ) {
138- ScheduledFuture <?> task = heartbeatTasks .remove (userId );
139- if (task != null ) {
140- task .cancel (true );
118+ @ Override
119+ public void disconnectConnection (UUID userId , String connectionId ) {
120+ Object lock = userLocks .get (userId );
121+ if (lock == null ) return ;
122+ synchronized (lock ) {
123+ cleanupSingleConnection (userId , connectionId );
124+ Map <String , SseEmitter > userConnections = connections .get (userId );
125+ if (userConnections == null || userConnections .isEmpty ()) {
126+ onlineStatusService .setUserOffline (userId );
127+ }
141128 }
142- SseEmitter emitter = connections .remove (userId );
143- if (emitter != null ) {
144- try {
145- emitter .complete ();
146- } catch (Exception e ) {
147- log .debug ("SSE 연결 정리 중 오류(정상적): {}" , e .getMessage ());
129+ }
130+
131+ private void cleanupConnection (UUID userId , String connectionId , SseEmitter emitter ) {
132+ Object lock = userLocks .get (userId );
133+ if (lock == null ) return ;
134+ synchronized (lock ) {
135+ Map <String , SseEmitter > userConnections = connections .get (userId );
136+ if (userConnections == null ) return ;
137+ if (userConnections .get (connectionId ) != emitter ) return ;
138+
139+ cleanupSingleConnection (userId , connectionId );
140+
141+ if (userConnections .isEmpty ()) {
142+ onlineStatusService .setUserOffline (userId );
143+ log .debug ("마지막 SSE 연결 해제 → 오프라인: userId={}" , userId );
144+ }
145+ }
146+ }
147+
148+ private void cleanupSingleConnection (UUID userId , String connectionId ) {
149+ ConcurrentHashMap <String , ScheduledFuture <?>> userTasks = heartbeatTasks .get (userId );
150+ if (userTasks != null ) {
151+ ScheduledFuture <?> task = userTasks .remove (connectionId );
152+ if (task != null ) task .cancel (true );
153+ }
154+
155+ ConcurrentHashMap <String , SseEmitter > userConnections = connections .get (userId );
156+ if (userConnections != null ) {
157+ SseEmitter emitter = userConnections .remove (connectionId );
158+ if (emitter != null ) {
159+ try {
160+ emitter .complete ();
161+ } catch (Exception e ) {
162+ log .debug ("SSE 연결 정리 중 오류(정상적): {}" , e .getMessage ());
163+ }
148164 }
149165 }
166+
167+ if (userConnections != null && userConnections .isEmpty ()) {
168+ connections .remove (userId );
169+ heartbeatTasks .remove (userId );
170+ userLocks .remove (userId );
171+ log .debug ("SSE 유저 상태 완전 제거 (마지막 연결 정리): userId={}" , userId );
172+ }
173+
174+ log .debug ("SSE 단일 연결 정리: userId={}, connectionId={}" , userId , connectionId );
175+ }
176+
177+ private void cleanupAllConnections (UUID userId ) {
178+ ConcurrentHashMap <String , ScheduledFuture <?>> userTasks = heartbeatTasks .remove (userId );
179+ if (userTasks != null ) {
180+ userTasks .values ().forEach (task -> task .cancel (true ));
181+ }
182+
183+ ConcurrentHashMap <String , SseEmitter > userConnections = connections .remove (userId );
184+ if (userConnections != null ) {
185+ userConnections .values ().forEach (emitter -> {
186+ try {
187+ emitter .complete ();
188+ } catch (Exception e ) {
189+ log .debug ("SSE 전체 연결 정리 중 오류(정상적): {}" , e .getMessage ());
190+ }
191+ });
192+ }
193+
194+ userLocks .remove (userId );
195+ log .debug ("SSE 전체 연결 정리 완료: userId={}" , userId );
150196 }
151197
152198 @ Override
@@ -174,8 +220,8 @@ public void onMessage(org.springframework.data.redis.connection.Message message,
174220 if (lock == null ) return ;
175221
176222 synchronized (lock ) {
177- SseEmitter emitter = connections .get (userId );
178- if (emitter == null ) return ;
223+ ConcurrentHashMap < String , SseEmitter > userConnections = connections .get (userId );
224+ if (userConnections == null || userConnections . isEmpty () ) return ;
179225
180226 Map <String , Object > data = Map .of (
181227 "title" , notificationMessage .title (),
@@ -185,13 +231,24 @@ public void onMessage(org.springframework.data.redis.connection.Message message,
185231 "timestamp" , LocalDateTime .now (),
186232 "relatedId" , notificationMessage .relatedId ()
187233 );
188- try {
189- emitter .send (SseEmitter .event ()
190- .name ("notification" )
191- .data (data ));
192- } catch (Exception sendEx ) {
193- log .warn ("SSE 전송 실패, 연결 정리: userId={}, error={}" , userId , sendEx .getMessage ());
194- cleanupUserState (userId );
234+
235+ List <String > deadConnectionIds = new ArrayList <>();
236+ for (Map .Entry <String , SseEmitter > entry : userConnections .entrySet ()) {
237+ try {
238+ entry .getValue ().send (SseEmitter .event ()
239+ .name ("notification" )
240+ .data (data ));
241+ } catch (Exception sendEx ) {
242+ log .warn ("SSE 전송 실패: userId={}, connectionId={}, error={}" ,
243+ userId , entry .getKey (), sendEx .getMessage ());
244+ deadConnectionIds .add (entry .getKey ());
245+ }
246+ }
247+
248+ for (String deadId : deadConnectionIds ) {
249+ cleanupSingleConnection (userId , deadId );
250+ }
251+ if (userConnections .isEmpty ()) {
195252 onlineStatusService .setUserOffline (userId );
196253 }
197254 }
@@ -202,27 +259,31 @@ public void onMessage(org.springframework.data.redis.connection.Message message,
202259
203260 @ Override
204261 public void cleanupInactiveConnections () {
205- List <UUID > deadUserIds = new ArrayList <>();
206-
207- for (Map .Entry <UUID , SseEmitter > entry : connections .entrySet ()) {
208- UUID userId = entry .getKey ();
262+ for (UUID userId : new ArrayList <>(connections .keySet ())) {
209263 Object lock = userLocks .get (userId );
210264 if (lock == null ) continue ;
211265 synchronized (lock ) {
212- SseEmitter emitter = connections .get (userId );
213- if (emitter == null ) continue ;
214- try {
215- emitter .send (SseEmitter .event ()
216- .name ("ping" )
217- .data (Map .of ("timestamp" , LocalDateTime .now ())));
218- } catch (Exception e ) {
219- deadUserIds .add (userId );
266+ ConcurrentHashMap <String , SseEmitter > userConnections = connections .get (userId );
267+ if (userConnections == null ) continue ;
268+
269+ List <String > deadConnectionIds = new ArrayList <>();
270+ for (Map .Entry <String , SseEmitter > entry : userConnections .entrySet ()) {
271+ try {
272+ entry .getValue ().send (SseEmitter .event ()
273+ .name ("ping" )
274+ .data (Map .of ("timestamp" , LocalDateTime .now ())));
275+ } catch (Exception e ) {
276+ deadConnectionIds .add (entry .getKey ());
277+ }
220278 }
221- }
222- }
223279
224- for (UUID userId : deadUserIds ) {
225- disconnectUser (userId );
280+ for (String deadId : deadConnectionIds ) {
281+ cleanupSingleConnection (userId , deadId );
282+ }
283+ if (userConnections .isEmpty ()) {
284+ onlineStatusService .setUserOffline (userId );
285+ }
286+ }
226287 }
227288 }
228289
@@ -238,42 +299,44 @@ public void shutdown() {
238299 Thread .currentThread ().interrupt ();
239300 }
240301
241- connections .values ().forEach (emitter -> {
242- try {
243- emitter .complete ();
244- } catch (Exception e ) {
245- log .debug ("SSE 연결 정리 중 오류: {}" , e .getMessage ());
246- }
247- });
302+ connections .values ().forEach (userConnections ->
303+ userConnections .values ().forEach (emitter -> {
304+ try {
305+ emitter .complete ();
306+ } catch (Exception e ) {
307+ log .debug ("SSE shutdown 정리 중 오류: {}" , e .getMessage ());
308+ }
309+ })
310+ );
248311 connections .clear ();
249312 heartbeatTasks .clear ();
250313 userLocks .clear ();
251314 }
252315
253- private void scheduleHeartbeat (UUID userId , SseEmitter emitter ) {
254- ScheduledFuture <?> existingTask = heartbeatTasks .get (userId );
255- if (existingTask != null ) {
256- existingTask .cancel (true );
257- }
258-
316+ private void scheduleHeartbeat (UUID userId , String connectionId , SseEmitter emitter ) {
259317 ScheduledFuture <?> task = heartbeatExecutor .scheduleWithFixedDelay (() -> {
260318 Object lock = userLocks .get (userId );
261319 if (lock == null ) return ;
262320 synchronized (lock ) {
263- if (connections .get (userId ) != emitter ) return ;
321+ ConcurrentHashMap <String , SseEmitter > userConnections = connections .get (userId );
322+ if (userConnections == null ) return ;
323+ if (userConnections .get (connectionId ) != emitter ) return ;
264324 try {
265325 emitter .send (SseEmitter .event ()
266326 .name ("heartbeat" )
267327 .data (Map .of ("timestamp" , LocalDateTime .now ())));
268328 onlineStatusService .refreshOnlineTTL (userId );
269329 } catch (Exception e ) {
270- log .debug ("Heartbeat 전송 실패, 연결 정리: userId={}" , userId );
271- cleanupUserState (userId );
272- onlineStatusService .setUserOffline (userId );
330+ log .debug ("Heartbeat 전송 실패, 연결 정리: userId={}, connectionId={}" , userId , connectionId );
331+ cleanupSingleConnection (userId , connectionId );
332+ ConcurrentHashMap <String , SseEmitter > remaining = connections .get (userId );
333+ if (remaining == null || remaining .isEmpty ()) {
334+ onlineStatusService .setUserOffline (userId );
335+ }
273336 }
274337 }
275338 }, 30 , 30 , TimeUnit .SECONDS );
276339
277- heartbeatTasks .put (userId , task );
340+ heartbeatTasks .get (userId ). put ( connectionId , task );
278341 }
279342}
0 commit comments