Skip to content

Commit 2a560cc

Browse files
committed
fix
1 parent afaaadb commit 2a560cc

File tree

9 files changed

+142
-28
lines changed

9 files changed

+142
-28
lines changed

testUtil/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ export function dummySession() {
184184
onSessionGracePeriodElapsed: () => {
185185
/* noop */
186186
},
187+
onMessageSendFailure: () => {
188+
/* noop */
189+
},
187190
},
188191
testingSessionOptions,
189192
currentProtocolVersion,

transport/client.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,18 @@ export abstract class ClientTransport<
107107
onSessionGracePeriodElapsed: () => {
108108
this.onSessionGracePeriodElapsed(session);
109109
},
110+
onMessageSendFailure: (msg, reason) => {
111+
this.log?.error(`failed to send message: ${reason}`, {
112+
...session.loggingMetadata,
113+
transportMessage: msg,
114+
});
115+
116+
this.protocolError({
117+
type: ProtocolError.MessageSendFailure,
118+
message: reason,
119+
});
120+
this.deleteSession(session, { unhealthy: true });
121+
},
110122
},
111123
this.options,
112124
currentProtocolVersion,
@@ -186,6 +198,18 @@ export abstract class ClientTransport<
186198
onSessionGracePeriodElapsed: () => {
187199
this.onSessionGracePeriodElapsed(handshakingSession);
188200
},
201+
onMessageSendFailure: (msg, reason) => {
202+
this.log?.error(`failed to send message: ${reason}`, {
203+
...handshakingSession.loggingMetadata,
204+
transportMessage: msg,
205+
});
206+
207+
this.protocolError({
208+
type: ProtocolError.MessageSendFailure,
209+
message: reason,
210+
});
211+
this.deleteSession(handshakingSession, { unhealthy: true });
212+
},
189213
},
190214
);
191215

@@ -395,6 +419,18 @@ export abstract class ClientTransport<
395419
onSessionGracePeriodElapsed: () => {
396420
this.onSessionGracePeriodElapsed(backingOffSession);
397421
},
422+
onMessageSendFailure: (msg, reason) => {
423+
this.log?.error(`failed to send message: ${reason}`, {
424+
...backingOffSession.loggingMetadata,
425+
transportMessage: msg,
426+
});
427+
428+
this.protocolError({
429+
type: ProtocolError.MessageSendFailure,
430+
message: reason,
431+
});
432+
this.deleteSession(backingOffSession, { unhealthy: true });
433+
},
398434
},
399435
);
400436

@@ -470,6 +506,18 @@ export abstract class ClientTransport<
470506
onSessionGracePeriodElapsed: () => {
471507
this.onSessionGracePeriodElapsed(connectingSession);
472508
},
509+
onMessageSendFailure: (msg, reason) => {
510+
this.log?.error(`failed to send message: ${reason}`, {
511+
...connectingSession.loggingMetadata,
512+
transportMessage: msg,
513+
});
514+
515+
this.protocolError({
516+
type: ProtocolError.MessageSendFailure,
517+
message: reason,
518+
});
519+
this.deleteSession(connectingSession, { unhealthy: true });
520+
},
473521
},
474522
);
475523

transport/server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,18 @@ export abstract class ServerTransport<
408408
onSessionGracePeriodElapsed: () => {
409409
this.onSessionGracePeriodElapsed(noConnectionSession);
410410
},
411+
onMessageSendFailure: (msg, reason) => {
412+
this.log?.error(`failed to send message: ${reason}`, {
413+
...noConnectionSession.loggingMetadata,
414+
transportMessage: msg,
415+
});
416+
417+
this.protocolError({
418+
type: ProtocolError.MessageSendFailure,
419+
message: reason,
420+
});
421+
this.deleteSession(noConnectionSession, { unhealthy: true });
422+
},
411423
},
412424
);
413425

transport/sessionStateMachine/SessionConnected.ts

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,18 @@ import {
99
} from '../message';
1010
import {
1111
IdentifiedSession,
12+
IdentifiedSessionListeners,
1213
IdentifiedSessionProps,
1314
SessionState,
1415
} from './common';
1516
import { Connection } from '../connection';
1617
import { SpanStatusCode } from '@opentelemetry/api';
1718
import { SendBufferResult, SendResult } from '../results';
1819

19-
export interface SessionConnectedListeners {
20+
export interface SessionConnectedListeners extends IdentifiedSessionListeners {
2021
onConnectionErrored: (err: unknown) => void;
2122
onConnectionClosed: () => void;
2223
onMessage: (msg: OpaqueTransportMessage) => void;
23-
onMessageSendFailure: (
24-
msg: PartialTransportMessage & { seq: number },
25-
reason: string,
26-
) => void;
2724
onInvalidMessage: (reason: string) => void;
2825
}
2926

@@ -74,13 +71,6 @@ export class SessionConnected<
7471
send(msg: PartialTransportMessage): SendResult {
7572
const encodeResult = this.encodeMsg(msg);
7673
if (!encodeResult.ok) {
77-
// safety: onMessageSendFailure tears down the session via protocol error,
78-
// which emits sessionStatus 'closing' and cleans up all procedure listeners.
79-
this.listeners.onMessageSendFailure(
80-
{ ...msg, seq: this.seq },
81-
encodeResult.reason,
82-
);
83-
8474
return encodeResult;
8575
}
8676

transport/sessionStateMachine/common.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,13 @@ export type InheritedProperties = Pick<
187187

188188
export type SessionId = string;
189189

190+
export interface IdentifiedSessionListeners {
191+
onMessageSendFailure: (
192+
msg: PartialTransportMessage & { seq: number },
193+
reason: string,
194+
) => void;
195+
}
196+
190197
// all sessions where we know the other side's client id
191198
export interface IdentifiedSessionProps extends CommonSessionProps {
192199
id: SessionId;
@@ -197,13 +204,15 @@ export interface IdentifiedSessionProps extends CommonSessionProps {
197204
sendBuffer: Array<EncodedTransportMessage>;
198205
telemetry: TelemetryInfo;
199206
protocolVersion: ProtocolVersion;
207+
listeners: IdentifiedSessionListeners;
200208
}
201209

202210
export abstract class IdentifiedSession extends CommonSession {
203211
readonly id: SessionId;
204212
readonly telemetry: TelemetryInfo;
205213
readonly to: TransportClientId;
206214
readonly protocolVersion: ProtocolVersion;
215+
listeners: IdentifiedSessionListeners;
207216

208217
/**
209218
* Index of the message we will send next (excluding handshake)
@@ -232,6 +241,7 @@ export abstract class IdentifiedSession extends CommonSession {
232241
log,
233242
protocolVersion,
234243
seqSent: messagesSent,
244+
listeners,
235245
} = props;
236246
super(props);
237247
this.id = id;
@@ -243,6 +253,7 @@ export abstract class IdentifiedSession extends CommonSession {
243253
this.log = log;
244254
this.protocolVersion = protocolVersion;
245255
this.seqSent = messagesSent;
256+
this.listeners = listeners;
246257
}
247258

248259
get loggingMetadata(): MessageMetadata {
@@ -275,6 +286,13 @@ export abstract class IdentifiedSession extends CommonSession {
275286

276287
const encoded = this.codec.toBuffer(msg);
277288
if (!encoded.ok) {
289+
// safety: onMessageSendFailure tears down the session via protocol error,
290+
// which emits sessionStatus 'closing' and cleans up all procedure listeners.
291+
this.listeners.onMessageSendFailure(
292+
{ ...partialMsg, seq: this.seq },
293+
encoded.reason,
294+
);
295+
278296
return encoded;
279297
}
280298

@@ -320,7 +338,8 @@ export abstract class IdentifiedSession extends CommonSession {
320338
}
321339
}
322340

323-
export interface IdentifiedSessionWithGracePeriodListeners {
341+
export interface IdentifiedSessionWithGracePeriodListeners
342+
extends IdentifiedSessionListeners {
324343
onSessionGracePeriodElapsed: () => void;
325344
}
326345

transport/sessionStateMachine/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
export { SessionState } from './common';
22
export { type SessionWaitingForHandshake } from './SessionWaitingForHandshake';
33
export { type SessionConnecting } from './SessionConnecting';
4-
export { type SessionNoConnection } from './SessionNoConnection';
4+
export {
5+
type SessionNoConnection,
6+
type SessionNoConnectionListeners,
7+
} from './SessionNoConnection';
58
export { type SessionHandshaking } from './SessionHandshaking';
69
export { type SessionConnected } from './SessionConnected';
710
export {

transport/sessionStateMachine/stateMachine.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,15 @@ function getPendingMockConnection(): PendingMockConnectionHandle {
9999
function createSessionNoConnectionListeners(): SessionNoConnectionListeners {
100100
return {
101101
onSessionGracePeriodElapsed: vi.fn(),
102+
onMessageSendFailure: vi.fn(),
102103
};
103104
}
104105

105106
function createSessionBackingOffListeners(): SessionBackingOffListeners {
106107
return {
107108
onBackoffFinished: vi.fn(),
108109
onSessionGracePeriodElapsed: vi.fn(),
110+
onMessageSendFailure: vi.fn(),
109111
};
110112
}
111113

@@ -115,6 +117,7 @@ function createSessionConnectingListeners(): SessionConnectingListeners {
115117
onConnectionFailed: vi.fn(),
116118
onConnectionTimeout: vi.fn(),
117119
onSessionGracePeriodElapsed: vi.fn(),
120+
onMessageSendFailure: vi.fn(),
118121
};
119122
}
120123

@@ -126,6 +129,7 @@ function createSessionHandshakingListeners(): SessionHandshakingListeners {
126129
onConnectionErrored: vi.fn(),
127130
onHandshakeTimeout: vi.fn(),
128131
onSessionGracePeriodElapsed: vi.fn(),
132+
onMessageSendFailure: vi.fn(),
129133
};
130134
}
131135

transport/sessionStateMachine/transitions.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import { CodecMessageAdapter } from '../../codec';
4444

4545
function inheritSharedSession(
4646
session: IdentifiedSession,
47-
): IdentifiedSessionProps {
47+
): Omit<IdentifiedSessionProps, 'listeners'> {
4848
return {
4949
id: session.id,
5050
from: session.from,
@@ -255,7 +255,7 @@ export const SessionStateGraph = {
255255
): SessionConnected<ConnType> => {
256256
const conn = pendingSession.conn;
257257
const { from, options } = pendingSession;
258-
const carriedState: IdentifiedSessionProps = oldSession
258+
const carriedState: Omit<IdentifiedSessionProps, 'listeners'> = oldSession
259259
? // old session exists, inherit state
260260
inheritSharedSession(oldSession)
261261
: // old session does not exist, create new state
@@ -279,7 +279,7 @@ export const SessionStateGraph = {
279279
log: pendingSession.log,
280280
protocolVersion,
281281
codec: new CodecMessageAdapter(options.codec),
282-
} satisfies IdentifiedSessionProps);
282+
} satisfies Omit<IdentifiedSessionProps, 'listeners'>);
283283

284284
pendingSession._handleStateExit();
285285
oldSession?._handleStateExit();

transport/transport.ts

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@ import {
1010
LoggingLevel,
1111
createLogProxy,
1212
} from '../logging/log';
13-
import { EventDispatcher, EventHandler, EventMap, EventTypes } from './events';
13+
import {
14+
EventDispatcher,
15+
EventHandler,
16+
EventMap,
17+
EventTypes,
18+
ProtocolError,
19+
} from './events';
1420
import {
1521
ProvidedTransportOptions,
1622
TransportOptions,
@@ -21,6 +27,7 @@ import {
2127
SessionConnecting,
2228
SessionHandshaking,
2329
SessionNoConnection,
30+
SessionNoConnectionListeners,
2431
SessionState,
2532
} from './sessionStateMachine';
2633
import { Connection } from './connection';
@@ -277,6 +284,18 @@ export abstract class Transport<ConnType extends Connection> {
277284
onSessionGracePeriodElapsed: () => {
278285
this.onSessionGracePeriodElapsed(noConnectionSession);
279286
},
287+
onMessageSendFailure: (msg, reason) => {
288+
this.log?.error(`failed to send message: ${reason}`, {
289+
...noConnectionSession.loggingMetadata,
290+
transportMessage: msg,
291+
});
292+
293+
this.protocolError({
294+
type: ProtocolError.MessageSendFailure,
295+
message: reason,
296+
});
297+
this.deleteSession(noConnectionSession, { unhealthy: true });
298+
},
280299
});
281300

282301
this.updateSession(noConnectionSession);
@@ -289,20 +308,36 @@ export abstract class Transport<ConnType extends Connection> {
289308
): SessionNoConnection {
290309
// transition to no connection
291310
let noConnectionSession: SessionNoConnection;
311+
const listeners: SessionNoConnectionListeners = {
312+
onSessionGracePeriodElapsed: () => {
313+
this.onSessionGracePeriodElapsed(noConnectionSession);
314+
},
315+
onMessageSendFailure: (msg, reason) => {
316+
this.log?.error(`failed to send message: ${reason}`, {
317+
...noConnectionSession.loggingMetadata,
318+
transportMessage: msg,
319+
});
320+
321+
this.protocolError({
322+
type: ProtocolError.MessageSendFailure,
323+
message: reason,
324+
});
325+
this.deleteSession(noConnectionSession, { unhealthy: true });
326+
},
327+
};
328+
292329
if (session.state === SessionState.Handshaking) {
293330
noConnectionSession =
294-
SessionStateGraph.transition.HandshakingToNoConnection(session, {
295-
onSessionGracePeriodElapsed: () => {
296-
this.onSessionGracePeriodElapsed(noConnectionSession);
297-
},
298-
});
331+
SessionStateGraph.transition.HandshakingToNoConnection(
332+
session,
333+
listeners,
334+
);
299335
} else {
300336
noConnectionSession =
301-
SessionStateGraph.transition.ConnectedToNoConnection(session, {
302-
onSessionGracePeriodElapsed: () => {
303-
this.onSessionGracePeriodElapsed(noConnectionSession);
304-
},
305-
});
337+
SessionStateGraph.transition.ConnectedToNoConnection(
338+
session,
339+
listeners,
340+
);
306341
}
307342

308343
this.updateSession(noConnectionSession);

0 commit comments

Comments
 (0)