99import BaseIssueDetector from './BaseIssueDetector' ;
1010
1111interface MissingStreamDetectorParams {
12+ timeoutMs ?: number ; // delay to report the issue no more often then once specified value
1213 steps ?: number ; // number of last stats to check
1314}
1415
@@ -21,7 +22,7 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
2122
2223 constructor ( params : MissingStreamDetectorParams = { } ) {
2324 super ( ) ;
24- this . #timeoutMs = 5_000 ;
25+ this . #timeoutMs = params . timeoutMs ?? 15_000 ;
2526 this . #steps = params . steps ?? 3 ;
2627 }
2728
@@ -40,19 +41,19 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
4041 return issues ;
4142 }
4243
43- const lastThreeProcessedStats = allLastProcessedStats . slice ( - this . #steps) ;
44+ const lastNProcessedStats = allLastProcessedStats . slice ( - this . #steps) ;
4445
45- const lastThreeVideoInbound = lastThreeProcessedStats . map ( ( stats ) => stats . video . inbound ) ;
46- const lastThreeAudioInbound = lastThreeProcessedStats . map ( ( stats ) => stats . audio . inbound ) ;
46+ const lastNVideoInbound = lastNProcessedStats . map ( ( stats ) => stats . video . inbound ) ;
47+ const lastNAudioInbound = lastNProcessedStats . map ( ( stats ) => stats . audio . inbound ) ;
4748
4849 issues . push ( ...this . detectMissingData (
49- lastThreeAudioInbound as unknown as CommonParsedInboundStreamStats [ ] [ ] ,
50+ lastNAudioInbound as unknown as CommonParsedInboundStreamStats [ ] [ ] ,
5051 IssueType . Stream ,
5152 IssueReason . MissingAudioStreamData ,
5253 ) ) ;
5354
5455 issues . push ( ...this . detectMissingData (
55- lastThreeVideoInbound ,
56+ lastNVideoInbound ,
5657 IssueType . Stream ,
5758 IssueReason . MissingVideoStreamData ,
5859 ) ) ;
@@ -70,36 +71,29 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
7071 }
7172
7273 private detectMissingData (
73- lastThreeInboundStats : CommonParsedInboundStreamStats [ ] [ ] ,
74+ lastNInboundStats : CommonParsedInboundStreamStats [ ] [ ] ,
7475 type : IssueType ,
7576 reason : IssueReason ,
7677 ) : IssueDetectorResult {
7778 const issues : IssuePayload [ ] = [ ] ;
7879
79- const firstInboundStats = lastThreeInboundStats [ 0 ] ;
80- const secondInboundStats = lastThreeInboundStats [ 1 ] ;
81- const currentInboundStats = lastThreeInboundStats [ 2 ] ;
82-
83- const firstInboundItemsByTrackId = MissingStreamDataDetector . mapStatsByTrackId ( firstInboundStats ) ;
84- const secondInboundItemsByTrackId = MissingStreamDataDetector . mapStatsByTrackId ( secondInboundStats ) ;
80+ const currentInboundStats = lastNInboundStats . pop ( ) ! ;
81+ const prevInboundItemsByTrackId = MissingStreamDataDetector . mapStatsByTrackId ( lastNInboundStats ) ;
8582
8683 currentInboundStats . forEach ( ( inboundItem ) => {
8784 const trackId = inboundItem . track . trackIdentifier ;
8885
89- const firstInboundItem = firstInboundItemsByTrackId . get ( trackId ) ;
90- const secondInboundItem = secondInboundItemsByTrackId . get ( trackId ) ;
91- if ( ! firstInboundItem || ! secondInboundItem ) {
86+ const prevInboundItems = prevInboundItemsByTrackId . get ( trackId ) ;
87+
88+ if ( ! Array . isArray ( prevInboundItems ) || prevInboundItems . length === 0 ) {
9289 return ;
9390 }
9491
9592 if ( inboundItem . track . detached || inboundItem . track . ended ) {
9693 return ;
9794 }
9895
99- if (
100- firstInboundItem . bytesReceived === secondInboundItem . bytesReceived
101- && secondInboundItem . bytesReceived === inboundItem . bytesReceived
102- ) {
96+ if ( MissingStreamDataDetector . isAllBytesReceivedDidntChange ( inboundItem . bytesReceived , prevInboundItems ) ) {
10397 const hasIssue = this . markIssue ( trackId ) ;
10498
10599 if ( ! hasIssue ) {
@@ -127,9 +121,27 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
127121 return issues ;
128122 }
129123
130- private static mapStatsByTrackId ( items : CommonParsedInboundStreamStats [ ] ) {
131- return new Map < string , CommonParsedInboundStreamStats > ( items
132- . map ( ( item ) => [ item . track . trackIdentifier , item ] as const ) ) ;
124+ private static mapStatsByTrackId ( items : CommonParsedInboundStreamStats [ ] [ ] ) : Map < string , CommonParsedInboundStreamStats [ ] > {
125+ const statsById = new Map < string , CommonParsedInboundStreamStats [ ] > ( ) ;
126+ items . forEach ( ( inboundItems ) => {
127+ inboundItems . forEach ( ( inbountItem ) => {
128+ const accumulatedItems = statsById . get ( inbountItem . track . trackIdentifier ) || [ ] ;
129+ accumulatedItems . push ( inbountItem ) ;
130+ statsById . set ( inbountItem . track . trackIdentifier , accumulatedItems ) ;
131+ } ) ;
132+ } )
133+
134+ return statsById ;
135+ }
136+
137+ private static isAllBytesReceivedDidntChange ( bytesReceived : number , inboundItems : CommonParsedInboundStreamStats [ ] ) : boolean {
138+ for ( const inboundItem of inboundItems ) {
139+ if ( inboundItem . bytesReceived !== bytesReceived ) {
140+ return false ;
141+ }
142+ }
143+
144+ return true ;
133145 }
134146
135147 private markIssue ( trackId : string ) : boolean {
0 commit comments