Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions internal/recorder/format_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"io"
"sync"
"time"

rtspformat "github.com/bluenviron/gortsplib/v4/pkg/format"
Expand Down Expand Up @@ -60,9 +61,21 @@ type formatMPEGTS struct {
mw *mpegts.Writer
hasVideo bool
currentSegment *formatMPEGTSSegment
mu sync.Mutex // protects all fields below
}

func (f *formatMPEGTS) initialize() bool {
// Check if this is an MPEG-TS passthrough stream
for _, media := range f.ri.stream.Desc.Medias {
for _, forma := range media.Formats {
if _, ok := forma.(*rtspformat.MPEGTS); ok {
f.ri.stream.AddReader(f.ri, media, forma, f.writePassthrough)
return true
}
}
}

// Normal MPEG-TS muxing for non-passthrough streams
var tracks []*mpegts.Track
var setuppedFormats []rtspformat.Format
setuppedFormatsMap := make(map[rtspformat.Format]struct{})
Expand Down Expand Up @@ -480,11 +493,83 @@ func (f *formatMPEGTS) initialize() bool {
}

func (f *formatMPEGTS) close() {
f.mu.Lock()
defer f.mu.Unlock()

// Close the current segment if it exists
if f.currentSegment != nil {
f.currentSegment.close() //nolint:errcheck
f.currentSegment = nil
}

// Flush any remaining data in the buffer
if f.bw != nil {
_ = f.bw.Flush()
}
}

func (f *formatMPEGTS) writePassthrough(u unit.Unit) error {
tunit := u.(*unit.Generic)
if len(tunit.RTPPackets) == 0 {
return nil
}

f.mu.Lock()
defer f.mu.Unlock()

// Initialize writers if needed
if f.dw == nil {
f.dw = &dynamicWriter{}
}
if f.bw == nil {
f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize)
}

for _, pkt := range tunit.RTPPackets {
if pkt.Payload == nil {
continue
}

// Create new segment if needed
if f.currentSegment == nil {
f.currentSegment = &formatMPEGTSSegment{
f: f,
startDTS: timestampToDuration(int64(pkt.Timestamp), 90000),
startNTP: tunit.NTP,
}
f.currentSegment.initialize()
}

// Write to current segment
_, err := f.currentSegment.Write(pkt.Payload)
if err != nil {
f.ri.Log(logger.Warn, "error writing to segment: %v", err)
continue
}

// Update segment timing
f.currentSegment.lastDTS = timestampToDuration(int64(pkt.Timestamp), 90000)

// Check if we need to rotate the segment
if (f.currentSegment.lastDTS - f.currentSegment.startDTS) >= f.ri.segmentDuration {
// Close the current segment
if err := f.currentSegment.close(); err != nil {
f.ri.Log(logger.Error, "error closing segment: %v", err)
}
f.currentSegment = nil

// Flush the buffer writer
if f.bw != nil {
if err := f.bw.Flush(); err != nil {
f.ri.Log(logger.Error, "error flushing buffer: %v", err)
}
}
}
}

return nil
}

func (f *formatMPEGTS) write(
dts time.Duration,
ntp time.Time,
Expand Down
161 changes: 161 additions & 0 deletions internal/recorder/recorder_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package recorder

import (
"bytes"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/pion/rtp"

"github.com/bluenviron/gortsplib/v4/pkg/description"
rtspformat "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/h265"
Expand Down Expand Up @@ -543,6 +546,164 @@ func TestRecorderSkipTracksFull(t *testing.T) {
}
}

func TestRecorderMPEGTSPassthrough(t *testing.T) {
// Create a test MPEG-TS packet
testData := []byte{
0x47, 0x40, 0x00, 0x10, 0x00, // TS header with PID 0x1000 (video)
// Add some dummy payload - this is a PAT (Program Association Table)
0x00, 0x00, 0xB0, 0x0D, 0x00, 0x00, 0xC1, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}

t.Run("basic", func(t *testing.T) {
// Create a temporary directory for the test
dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err)
defer os.RemoveAll(dir)

// Create the output directory
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")

// Create a stream with MPEG-TS format
mpegtsFormat := &rtspformat.MPEGTS{}

desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []rtspformat.Format{mpegtsFormat},
},
}}

strm := &stream.Stream{
WriteQueueSize: 512,
UDPMaxPayloadSize: 1472,
Desc: desc,
GenerateRTPPackets: false, // Disable RTP packet generation
Parent: test.NilLogger,
}
err = strm.Initialize()
require.NoError(t, err)

// Create a recorder with MPEG-TS format
r := &Recorder{
PathFormat: recordPath,
Format: conf.RecordFormatMPEGTS,
PartDuration: 1 * time.Second,
SegmentDuration: 2 * time.Second,
PathName: "mypath",
Stream: strm,
Parent: test.NilLogger,
}

// Create a buffered channel to receive data from the stream

// Buffer channel to prevent blocking on data send
dataChan := make(chan []byte, 100)

// Add reader before initializing to avoid race with recorder's own reader
rtpReader := func(u unit.Unit) error {
rtpPackets := u.GetRTPPackets()

for _, pkt := range rtpPackets {
if pkt == nil {
continue
}

// Send the payload to the channel
if len(pkt.Payload) > 0 {
select {
case dataChan <- pkt.Payload:
default:
// Drop if channel is full
}
}
}

return nil
}

strm.AddReader(r, desc.Medias[0], mpegtsFormat, rtpReader)

// Initialize the recorder
r.Initialize()

// Verify the recorder instance was created
require.NotNil(t, r.currentInstance, "Recorder instance is nil")

// Start the reader after the recorder is fully initialized
strm.StartReader(r)
strm.WaitRunningReader()

// Ensure cleanup happens in the correct order
t.Cleanup(func() {
time.Sleep(100 * time.Millisecond) // Allow time for writes to complete
r.Close()
strm.Close()
})

// Write multiple RTP packets to ensure we have enough data
for i := 0; i < 10; i++ {
// Create a new RTP packet for each iteration with updated timestamp and sequence number
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 33, // Standard MPEG-TS payload type (RFC 2250)
SequenceNumber: 100 + uint16(i),
Timestamp: 123456 + uint32(i*90000), // 1 second apart
SSRC: 0x9D8F,
},
Payload: testData,
}

// Use the MPEG-TS format directly
internalFormat := mpegtsFormat

// Write the RTP packet to the stream
strm.WriteRTPPacket(desc.Medias[0], internalFormat, pkt, time.Now(), 0)
}

// Check if the recorder instance was properly initialized
if r.currentInstance == nil {
t.Fatal("Recorder currentInstance is nil")
}

// Give some time for all writes to complete
time.Sleep(200 * time.Millisecond)

// Verify the recording was created
entries, err := os.ReadDir(filepath.Join(dir, "mypath"))
require.NoError(t, err)

// Find the .ts file
var tsFile string
for _, entry := range entries {
if filepath.Ext(entry.Name()) == ".ts" {
tsFile = filepath.Join(dir, "mypath", entry.Name())
break
}
}

if tsFile == "" {
t.Fatalf("No .ts file found in %s. Directory contents: %v", filepath.Join(dir, "mypath"), entries)
}

// Verify the file contains our test data
data, err := os.ReadFile(tsFile)
require.NoError(t, err)
require.Greater(t, len(data), 0, "Recorded file is empty")
require.True(t, bytes.Contains(data, testData), "Test data not found in output")

// Give some time for all writes to complete before test ends
time.Sleep(200 * time.Millisecond)
})
}

func TestRecorderFMP4SegmentSwitch(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{
{
Expand Down