Skip to content

Commit 91b2d9a

Browse files
committed
fix: ReadBuffer skips malformed lines instead of stalling on them
Previously, readMessage() returned null after consuming an unparseable line even when more complete lines existed in the buffer. This caused valid messages following a bad line in the same chunk to be silently dropped until the next chunk arrived (or forever, in tests). Fix the loop so null means only "no complete line available", not "encountered a parse failure". Blank/whitespace-only lines are now silently skipped via isBlank() rather than forwarded to the deserializer and logged as errors. Refactor the method into three focused helpers: - readMessage() — outer loop over lines - readLine() — consume the next newline-delimited line from the buffer - tryRecover() — attempt deserialization from the first '{' onward
1 parent d4bd6b9 commit 91b2d9a

3 files changed

Lines changed: 60 additions & 32 deletions

File tree

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ReadBuffer.kt

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,45 +20,56 @@ public class ReadBuffer {
2020
buffer.write(chunk)
2121
}
2222

23+
@Suppress("TooGenericExceptionCaught")
2324
public fun readMessage(): JSONRPCMessage? {
24-
if (buffer.exhausted()) return null
25-
var lfIndex = buffer.indexOf('\n'.code.toByte())
26-
val line = when (lfIndex) {
27-
-1L -> return null
25+
var message: JSONRPCMessage? = null
26+
while (message == null) {
27+
val line = readNextLine() ?: return null
28+
if (line.isBlank()) continue
2829

29-
0L -> {
30-
buffer.skip(1)
31-
return null
32-
}
33-
34-
else -> {
35-
var skipBytes = 1
36-
if (buffer[lfIndex - 1] == '\r'.code.toByte()) {
37-
lfIndex -= 1
38-
skipBytes += 1
39-
}
40-
val string = buffer.readString(lfIndex)
41-
buffer.skip(skipBytes.toLong())
42-
string
30+
try {
31+
message = deserializeMessage(line)
32+
} catch (e: Exception) {
33+
logger.error(e) { "Failed to deserialize message from line: $line\nAttempting to recover..." }
34+
message = tryRecover(line)
4335
}
4436
}
45-
try {
46-
return deserializeMessage(line)
47-
} catch (e: Exception) {
48-
logger.error(e) { "Failed to deserialize message from line: $line\nAttempting to recover..." }
49-
// if there is a non-JSON object prefix, try to parse from the first '{' onward.
50-
val braceIndex = line.indexOf('{')
51-
if (braceIndex != -1) {
52-
val trimmed = line.substring(braceIndex)
53-
try {
54-
return deserializeMessage(trimmed)
55-
} catch (ignored: Exception) {
56-
logger.error(ignored) { "Deserialization failed for line: $line\nSkipping..." }
57-
}
37+
return message
38+
}
39+
40+
private fun readNextLine(): String? {
41+
val lfIndex = if (buffer.exhausted()) -1L else buffer.indexOf('\n'.code.toByte())
42+
if (lfIndex == -1L) return null
43+
44+
return if (lfIndex == 0L) {
45+
buffer.skip(1)
46+
""
47+
} else {
48+
var skipBytes = 1
49+
var messageLength = lfIndex
50+
if (buffer[lfIndex - 1] == '\r'.code.toByte()) {
51+
messageLength -= 1
52+
skipBytes += 1
5853
}
54+
val string = buffer.readString(messageLength)
55+
buffer.skip(skipBytes.toLong())
56+
string
5957
}
58+
}
6059

61-
return null
60+
@Suppress("TooGenericExceptionCaught")
61+
private fun tryRecover(line: String): JSONRPCMessage? {
62+
// if there is a non-JSON object prefix, try to parse from the first '{' onward.
63+
val braceIndex = line.indexOf('{')
64+
if (braceIndex == -1) return null
65+
66+
val trimmed = line.substring(braceIndex)
67+
return try {
68+
deserializeMessage(trimmed)
69+
} catch (ignored: Exception) {
70+
logger.error(ignored) { "Deserialization failed for line: $line\nSkipping..." }
71+
null
72+
}
6273
}
6374

6475
public fun clear() {

kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ReadBufferTest.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class ReadBufferTest {
3131
val messageBytes = json.encodeToString(testMessage).encodeToByteArray()
3232
readBuffer.append(messageBytes)
3333
assertNull(readBuffer.readMessage())
34+
readBuffer.append("\r".encodeToByteArray())
35+
assertNull(readBuffer.readMessage())
3436

3537
// Append a newline and verify message is now available
3638
readBuffer.append("\n".encodeToByteArray())
@@ -45,6 +47,20 @@ class ReadBufferTest {
4547
assertNull(readBuffer.readMessage())
4648
}
4749

50+
@Test
51+
fun `skip blank line`() {
52+
val readBuffer = ReadBuffer()
53+
readBuffer.append(" \n".toByteArray())
54+
assertNull(readBuffer.readMessage())
55+
}
56+
57+
@Test
58+
fun `skip invalid json line`() {
59+
val readBuffer = ReadBuffer()
60+
readBuffer.append(" {ah=oh\n".toByteArray())
61+
assertNull(readBuffer.readMessage())
62+
}
63+
4864
@Test
4965
fun `should be reusable after clearing`() {
5066
val readBuffer = ReadBuffer()

kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ class StdioServerTransportTest {
260260

261261
val validMessage = PingRequest().toJSON()
262262
inputWriter.write("not-valid-json\n".toByteArray())
263+
inputWriter.write(" \t \r \n".toByteArray()) // blank line
263264
inputWriter.write(serializeMessage(validMessage).toByteArray())
264265
inputWriter.flush()
265266

0 commit comments

Comments
 (0)