Skip to content

Commit ad4e94e

Browse files
committed
Serialize mixer output frame ordering via AsyncStream.
Replace unstructured Task {} in MediaMixerOutput and StreamOutput conformances with AsyncStream channels that preserve FIFO ordering. The previous pattern created a new Task for each audio/video callback, which provides no ordering guarantee when entering an actor's serial executor. This caused adjacent frames to arrive out of order, resulting in RTMPTimestamp.invalidSequence errors (silent frame drops) and AVAssetWriter failures in StreamRecorder.
1 parent fe04d3e commit ad4e94e

3 files changed

Lines changed: 89 additions & 12 deletions

File tree

HaishinKit/Sources/Stream/StreamRecorder.swift

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ public actor StreamRecorder {
126126
private var audioPresentationTime: CMTime = .zero
127127
private var videoPresentationTime: CMTime = .zero
128128
private var dimensions: CMVideoDimensions = .init(width: 0, height: 0)
129+
nonisolated(unsafe) private var inputContinuation: AsyncStream<CMSampleBuffer>.Continuation?
130+
private var inputConsumerTask: Task<Void, Never>?
129131

130132
/// Creates a new recorder.
131133
public init() {
@@ -191,6 +193,7 @@ public actor StreamRecorder {
191193
videoPresentationTime = .zero
192194
audioPresentationTime = .zero
193195
self.settings = settings
196+
startInputConsumer()
194197

195198
isRecording = true
196199
}
@@ -215,6 +218,7 @@ public actor StreamRecorder {
215218
throw Error.invalidState
216219
}
217220
defer {
221+
stopInputConsumer()
218222
isRecording = false
219223
continuation = nil
220224
self.writer = nil
@@ -254,6 +258,23 @@ public actor StreamRecorder {
254258
return url.pathExtension.isEmpty ? url.appendingPathComponent(UUID().uuidString).appendingPathExtension(Self.defaultPathExtension) : url
255259
}
256260

261+
private func startInputConsumer() {
262+
let (stream, continuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
263+
inputContinuation = continuation
264+
inputConsumerTask = Task {
265+
for await sampleBuffer in stream {
266+
append(sampleBuffer)
267+
}
268+
}
269+
}
270+
271+
private func stopInputConsumer() {
272+
inputContinuation?.finish()
273+
inputContinuation = nil
274+
inputConsumerTask?.cancel()
275+
inputConsumerTask = nil
276+
}
277+
257278
private func append(_ sampleBuffer: CMSampleBuffer) {
258279
guard isRecording else {
259280
return
@@ -352,31 +373,27 @@ public actor StreamRecorder {
352373
extension StreamRecorder: StreamOutput {
353374
// MARK: HKStreamOutput
354375
nonisolated public func stream(_ stream: some StreamConvertible, didOutput video: CMSampleBuffer) {
355-
Task { await append(video) }
376+
inputContinuation?.yield(video)
356377
}
357378

358379
nonisolated public func stream(_ stream: some StreamConvertible, didOutput audio: AVAudioBuffer, when: AVAudioTime) {
359380
guard let sampleBuffer = (audio as? AVAudioPCMBuffer)?.makeSampleBuffer(when) else {
360381
return
361382
}
362-
Task { await append(sampleBuffer) }
383+
inputContinuation?.yield(sampleBuffer)
363384
}
364385
}
365386

366387
extension StreamRecorder: MediaMixerOutput {
367388
// MARK: MediaMixerOutput
368389
nonisolated public func mixer(_ mixer: MediaMixer, didOutput sampleBuffer: CMSampleBuffer) {
369-
Task {
370-
await append(sampleBuffer)
371-
}
390+
inputContinuation?.yield(sampleBuffer)
372391
}
373392

374393
nonisolated public func mixer(_ mixer: MediaMixer, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) {
375394
guard let sampleBuffer = buffer.makeSampleBuffer(when) else {
376395
return
377396
}
378-
Task {
379-
await append(sampleBuffer)
380-
}
397+
inputContinuation?.yield(sampleBuffer)
381398
}
382399
}

RTMPHaishinKit/Sources/RTMP/RTMPStream.swift

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ public actor RTMPStream {
224224
private var expectedResponse: Code?
225225
package var bitRateStrategy: (any StreamBitRateStrategy)?
226226
private var statusContinuation: AsyncStream<RTMPStatus>.Continuation?
227+
nonisolated(unsafe) private var mixerAudioContinuation: AsyncStream<(AVAudioPCMBuffer, AVAudioTime)>.Continuation?
228+
nonisolated(unsafe) private var mixerVideoContinuation: AsyncStream<CMSampleBuffer>.Continuation?
227229
private(set) var id: UInt32 = RTMPStream.defaultID
228230
package lazy var incoming = IncomingStream(self)
229231
package lazy var outgoing = OutgoingStream()
@@ -283,6 +285,8 @@ public actor RTMPStream {
283285
}
284286

285287
deinit {
288+
mixerAudioContinuation?.finish()
289+
mixerVideoContinuation?.finish()
286290
outputs.removeAll()
287291
}
288292

@@ -388,6 +392,7 @@ public actor RTMPStream {
388392
readyState = .publishing
389393
try? send("@setDataFrame", arguments: "onMetaData", metadata)
390394
outgoing.startRunning()
395+
startMixerInputConsumers()
391396
Task {
392397
for await audio in outgoing.audioOutputStream {
393398
append(audio.0, when: audio.1)
@@ -415,6 +420,7 @@ public actor RTMPStream {
415420
guard readyState == .playing || readyState == .publishing else {
416421
throw Error.invalidState
417422
}
423+
stopMixerInputConsumers()
418424
outgoing.stopRunning()
419425
return try await withCheckedThrowingContinuation { continutation in
420426
self.continuation = continutation
@@ -683,6 +689,30 @@ public actor RTMPStream {
683689
}
684690
}
685691

692+
private func startMixerInputConsumers() {
693+
let (audioStream, audioContinuation) = AsyncStream.makeStream(of: (AVAudioPCMBuffer, AVAudioTime).self)
694+
let (videoStream, videoContinuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
695+
mixerAudioContinuation = audioContinuation
696+
mixerVideoContinuation = videoContinuation
697+
Task {
698+
for await (buffer, when) in audioStream {
699+
append(buffer, when: when)
700+
}
701+
}
702+
Task {
703+
for await sampleBuffer in videoStream {
704+
append(sampleBuffer)
705+
}
706+
}
707+
}
708+
709+
private func stopMixerInputConsumers() {
710+
mixerAudioContinuation?.finish()
711+
mixerAudioContinuation = nil
712+
mixerVideoContinuation?.finish()
713+
mixerVideoContinuation = nil
714+
}
715+
686716
/// Creates flv metadata for a stream.
687717
private func makeMetadata() -> AMFArray {
688718
// https://github.com/shogo4405/HaishinKit.swift/issues/1410
@@ -806,10 +836,10 @@ extension RTMPStream: MediaMixerOutput {
806836
}
807837

808838
nonisolated public func mixer(_ mixer: MediaMixer, didOutput sampleBuffer: CMSampleBuffer) {
809-
Task { await append(sampleBuffer) }
839+
mixerVideoContinuation?.yield(sampleBuffer)
810840
}
811841

812842
nonisolated public func mixer(_ mixer: MediaMixer, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) {
813-
Task { await append(buffer, when: when) }
843+
mixerAudioContinuation?.yield((buffer, when))
814844
}
815845
}

SRTHaishinKit/Sources/SRT/SRTStream.swift

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public actor SRTStream {
2424
package lazy var incoming = IncomingStream(self)
2525
package lazy var outgoing = OutgoingStream()
2626
private weak var connection: SRTConnection?
27+
nonisolated(unsafe) private var mixerAudioContinuation: AsyncStream<(AVAudioPCMBuffer, AVAudioTime)>.Continuation?
28+
nonisolated(unsafe) private var mixerVideoContinuation: AsyncStream<CMSampleBuffer>.Continuation?
2729

2830
/// The error domain codes.
2931
public enum Error: Swift.Error {
@@ -38,6 +40,8 @@ public actor SRTStream {
3840
}
3941

4042
deinit {
43+
mixerAudioContinuation?.finish()
44+
mixerVideoContinuation?.finish()
4145
outputs.removeAll()
4246
}
4347

@@ -58,6 +62,7 @@ public actor SRTStream {
5862
return
5963
}
6064
readyState = .publishing
65+
startMixerInputConsumers()
6166
outgoing.startRunning()
6267
if outgoing.videoInputFormat != nil {
6368
writer.expectedMedias.insert(.video)
@@ -121,6 +126,7 @@ public actor SRTStream {
121126
guard readyState != .idle else {
122127
return
123128
}
129+
stopMixerInputConsumers()
124130
writer.clear()
125131
reader.clear()
126132
outgoing.stopRunning()
@@ -139,6 +145,30 @@ public actor SRTStream {
139145
func doInput(_ data: Data) {
140146
_ = reader.read(data)
141147
}
148+
149+
private func startMixerInputConsumers() {
150+
let (audioStream, audioContinuation) = AsyncStream.makeStream(of: (AVAudioPCMBuffer, AVAudioTime).self)
151+
let (videoStream, videoContinuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
152+
mixerAudioContinuation = audioContinuation
153+
mixerVideoContinuation = videoContinuation
154+
Task {
155+
for await (buffer, when) in audioStream {
156+
append(buffer, when: when)
157+
}
158+
}
159+
Task {
160+
for await sampleBuffer in videoStream {
161+
append(sampleBuffer)
162+
}
163+
}
164+
}
165+
166+
private func stopMixerInputConsumers() {
167+
mixerAudioContinuation?.finish()
168+
mixerAudioContinuation = nil
169+
mixerVideoContinuation?.finish()
170+
mixerVideoContinuation = nil
171+
}
142172
}
143173

144174
extension SRTStream: _Stream {
@@ -203,10 +233,10 @@ extension SRTStream: MediaMixerOutput {
203233
}
204234

205235
nonisolated public func mixer(_ mixer: MediaMixer, didOutput sampleBuffer: CMSampleBuffer) {
206-
Task { await append(sampleBuffer) }
236+
mixerVideoContinuation?.yield(sampleBuffer)
207237
}
208238

209239
nonisolated public func mixer(_ mixer: MediaMixer, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) {
210-
Task { await append(buffer, when: when) }
240+
mixerAudioContinuation?.yield((buffer, when))
211241
}
212242
}

0 commit comments

Comments
 (0)