Skip to content

Commit 072d5ec

Browse files
committed
Use EventSource exclusively for SSE handling
1 parent e98cd17 commit 072d5ec

File tree

1 file changed

+29
-20
lines changed

1 file changed

+29
-20
lines changed

Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -83,37 +83,28 @@ extension URLSession {
8383
request.addValue("application/json", forHTTPHeaderField: "Content-Type")
8484
}
8585

86-
let (bytes, response) = try await self.bytes(for: request)
86+
let (data, response) = try await self.data(for: request)
8787

8888
guard let httpResponse = response as? HTTPURLResponse else {
8989
throw URLSessionError.invalidResponse
9090
}
9191

9292
guard (200 ..< 300).contains(httpResponse.statusCode) else {
93-
var errorData = Data()
94-
for try await byte in bytes {
95-
errorData.append(byte)
96-
}
97-
98-
if let errorString = String(data: errorData, encoding: .utf8) {
93+
if let errorString = String(data: data, encoding: .utf8) {
9994
throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: errorString)
10095
}
10196
throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: "Invalid response")
10297
}
10398

104-
var buffer = Data()
105-
106-
for try await byte in bytes {
107-
buffer.append(byte)
99+
var buffer = data
108100

109-
while let newlineIndex = buffer.firstIndex(of: UInt8(ascii: "\n")) {
110-
let chunk = buffer[..<newlineIndex]
111-
buffer = buffer[buffer.index(after: newlineIndex)...]
101+
while let newlineIndex = buffer.firstIndex(of: UInt8(ascii: "\n")) {
102+
let chunk = buffer[..<newlineIndex]
103+
buffer = buffer[buffer.index(after: newlineIndex)...]
112104

113-
if !chunk.isEmpty {
114-
let decoded = try decoder.decode(T.self, from: chunk)
115-
continuation.yield(decoded)
116-
}
105+
if !chunk.isEmpty {
106+
let decoded = try decoder.decode(T.self, from: chunk)
107+
continuation.yield(decoded)
117108
}
118109
}
119110

@@ -151,10 +142,28 @@ extension URLSession {
151142
request.addValue("application/json", forHTTPHeaderField: "Content-Type")
152143
}
153144

154-
let (bytes, _) = try await self.bytes(for: request)
145+
let (asyncBytes, response) = try await self.data(for: request)
146+
147+
guard let httpResponse = response as? HTTPURLResponse else {
148+
throw URLSessionError.invalidResponse
149+
}
150+
151+
guard (200 ..< 300).contains(httpResponse.statusCode) else {
152+
if let errorString = String(data: asyncBytes, encoding: .utf8) {
153+
throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: errorString)
154+
}
155+
throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: "Invalid response")
156+
}
157+
155158
let decoder = JSONDecoder()
159+
let parser = EventSource.Parser()
160+
161+
for byte in asyncBytes {
162+
await parser.consume(byte)
163+
}
164+
await parser.finish()
156165

157-
for try await event in bytes.events {
166+
while let event = await parser.getNextEvent() {
158167
guard let data = event.data.data(using: .utf8) else { continue }
159168

160169
if let decoded = try? decoder.decode(T.self, from: data) {

0 commit comments

Comments
 (0)