diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 5ccb04d0..80a6c45a 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -346,6 +346,7 @@ func newRTPStreamStats(h rtp.Handler, stats *rtpCountingStats) *rtpStreamStats { } type rtpStreamStats struct { + packets atomic.Uint64 // per rtpStreamStats, where rtpStreamStats.stats.packets may not be exclusive to this stream lastSeq atomic.Uint64 lastPacket atomic.Int64 h rtp.Handler @@ -363,7 +364,8 @@ func (h *rtpStreamStats) Close() { } func (h *rtpStreamStats) HandleRTP(hdr *prtp.Header, payload []byte) error { - count := h.stats.packets.Add(1) + count := h.packets.Add(1) + h.stats.packets.Add(1) h.stats.bytes.Add(uint64(len(payload))) now := time.Now().UnixMilli() diff --git a/pkg/sip/silence_filler.go b/pkg/sip/silence_filler.go index 7998042b..35ed5984 100644 --- a/pkg/sip/silence_filler.go +++ b/pkg/sip/silence_filler.go @@ -16,6 +16,7 @@ package sip import ( "fmt" + "sync/atomic" "time" msdk "github.com/livekit/media-sdk" @@ -31,11 +32,11 @@ type silenceFiller struct { pcmSink msdk.PCM16Writer samplesPerFrame int log logger.Logger - lastTS uint32 - lastSeq uint16 - initialized bool - gapCount uint64 - gapSizeSum uint64 + lastTS atomic.Uint64 + lastSeq atomic.Uint64 + packets atomic.Uint64 + gapCount atomic.Uint64 + gapSizeSum atomic.Uint64 lastPrintTime time.Time } @@ -59,6 +60,12 @@ func newSilenceFiller(encodedSink rtp.Handler, pcmSink msdk.PCM16Writer, clockRa pcmSink: pcmSink, samplesPerFrame: clockRate / rtp.DefFramesPerSec, log: log, + packets: atomic.Uint64{}, + lastSeq: atomic.Uint64{}, + lastTS: atomic.Uint64{}, + gapCount: atomic.Uint64{}, + gapSizeSum: atomic.Uint64{}, + lastPrintTime: time.Time{}, } for _, option := range options { option(h) @@ -71,34 +78,36 @@ func (h *silenceFiller) String() string { } func (h *silenceFiller) isSilenceSuppression(header *rtp.Header) (bool, int) { - if !h.initialized { - h.initialized = true - h.lastSeq = header.SequenceNumber - h.lastTS = header.Timestamp + packets := h.packets.Add(1) + lastSeq := uint16(h.lastSeq.Swap(uint64(header.SequenceNumber))) + lastTS := uint32(h.lastTS.Swap(uint64(header.Timestamp))) + if packets == 1 { return false, 0 } currentTS := header.Timestamp currentSeq := header.SequenceNumber - expectedSeq := h.lastSeq + 1 - expectedTS := h.lastTS + uint32(h.samplesPerFrame) + expectedSeq := lastSeq + 1 + expectedTS := lastTS + uint32(h.samplesPerFrame) seqDiff := currentSeq - expectedSeq tsDiff := currentTS - expectedTS - h.lastTS = currentTS - h.lastSeq = currentSeq - // A key characteristic of DTX or silence supression is no sequence gaps, but >1 frame TS gaps + if seqDiff != 0 { + // Also filters out out-of-order packets + return false, 0 + } - if seqDiff != 0 || tsDiff == 0 { - // This also filters out out-of-order packets, due to seqDiff != 0 + missedFrames := int(tsDiff) / int(h.samplesPerFrame) + if missedFrames == 0 { + // Needs to be after / int(h.samplesPerFrame), + // since some RTP resets may result in a 0 < TS diff < h.samplesPerFrame return false, 0 } // Silence supression happened - sequential packets (no loss), but with a gap in timestamp - missedFrames := int(tsDiff) / int(h.samplesPerFrame) return true, missedFrames } @@ -115,18 +124,19 @@ func (h *silenceFiller) fillWithSilence(framesToFill int) error { func (h *silenceFiller) HandleRTP(header *rtp.Header, payload []byte) error { isSilenceSupression, missingFrameCount := h.isSilenceSuppression(header) - if isSilenceSupression { - h.gapCount++ - h.gapSizeSum += uint64(missingFrameCount) + if isSilenceSupression && missingFrameCount <= h.maxGapSize*100 { + // <= h.maxGapSize * 100 done to filter out what is most likely an RTP reset + count := h.gapCount.Add(1) + sum := h.gapSizeSum.Add(uint64(missingFrameCount)) if time.Since(h.lastPrintTime) > 15*time.Second { h.lastPrintTime = time.Now() h.log.Infow("timestamp gap", "rtpSeq", header.SequenceNumber, "rtpTimestamp", header.Timestamp, "rtpMarker", header.Marker, - "gapCount", h.gapCount, + "gapCount", count, "gapSize", missingFrameCount, - "gapSizeSum", h.gapSizeSum, // Used to get averages and figure out outliers between prints + "gapSizeSum", sum, // Used to get averages and figure out outliers between prints ) }