@@ -44,11 +44,13 @@ import (
4444 "go.opentelemetry.io/collector/internal/testutil"
4545 "go.opentelemetry.io/collector/pdata/plog"
4646 "go.opentelemetry.io/collector/pdata/pmetric"
47+ "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
4748 "go.opentelemetry.io/collector/pdata/pprofile"
4849 "go.opentelemetry.io/collector/pdata/ptrace"
4950 "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
5051 "go.opentelemetry.io/collector/pdata/testdata"
5152 "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metadata"
53+ "go.opentelemetry.io/collector/receiver/receiverhelper"
5254 "go.opentelemetry.io/collector/receiver/receivertest"
5355)
5456
@@ -118,8 +120,8 @@ func TestJsonHttp(t *testing.T) {
118120 name : "Retryable GRPCError" ,
119121 encoding : "" ,
120122 contentType : "application/json" ,
121- err : status .New (codes .Unavailable , "" ).Err (),
122- expectedStatus : & spb.Status {Code : int32 (codes .Unavailable ), Message : "" },
123+ err : status .New (codes .Unavailable , "Service Unavailable " ).Err (),
124+ expectedStatus : & spb.Status {Code : int32 (codes .Unavailable ), Message : "Service Unavailable " },
123125 expectedStatusCode : http .StatusServiceUnavailable ,
124126 },
125127 }
@@ -145,7 +147,8 @@ func TestJsonHttp(t *testing.T) {
145147 errStatus := & spb.Status {}
146148 require .NoError (t , json .Unmarshal (respBytes , errStatus ))
147149 if s , ok := status .FromError (tt .err ); ok {
148- assert .True (t , proto .Equal (errStatus , s .Proto ()))
150+ assert .Equal (t , s .Proto ().Code , errStatus .Code )
151+ assert .Equal (t , s .Proto ().Message , errStatus .Message )
149152 } else {
150153 fmt .Println (errStatus )
151154 assert .True (t , proto .Equal (errStatus , tt .expectedStatus ))
@@ -365,15 +368,15 @@ func TestProtoHttp(t *testing.T) {
365368 {
366369 name : "Permanent GRPCError" ,
367370 encoding : "" ,
368- err : status .New (codes .InvalidArgument , "" ).Err (),
369- expectedStatus : & spb.Status {Code : int32 (codes .InvalidArgument ), Message : "" },
371+ err : status .New (codes .InvalidArgument , "Bad Request " ).Err (),
372+ expectedStatus : & spb.Status {Code : int32 (codes .InvalidArgument ), Message : "Bad Request " },
370373 expectedStatusCode : http .StatusBadRequest ,
371374 },
372375 {
373376 name : "Retryable GRPCError" ,
374377 encoding : "" ,
375- err : status .New (codes .Unavailable , "" ).Err (),
376- expectedStatus : & spb.Status {Code : int32 (codes .Unavailable ), Message : "" },
378+ err : status .New (codes .Unavailable , "Service Unavailable " ).Err (),
379+ expectedStatus : & spb.Status {Code : int32 (codes .Unavailable ), Message : "Service Unavailable " },
377380 expectedStatusCode : http .StatusServiceUnavailable ,
378381 },
379382 }
@@ -553,6 +556,45 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
553556 require .Error (t , r .Start (context .Background (), componenttest .NewNopHost ()))
554557}
555558
559+ // TestOTLPReceiverGRPCMetricsIngestTest checks that the metrics receiver
560+ // is returning the proper response (return and metrics) when the next consumer
561+ // in the pipeline reports error.
562+ func TestOTLPReceiverGRPCMetricsIngestTest (t * testing.T ) {
563+ // Get a new available port
564+ addr := testutil .GetAvailableLocalAddress (t )
565+
566+ // Create a sink
567+ sink := & errOrSinkConsumer {MetricsSink : new (consumertest.MetricsSink )}
568+
569+ // Create a telemetry instance
570+ tt := componenttest .NewTelemetry ()
571+ t .Cleanup (func () { require .NoError (t , tt .Shutdown (context .Background ())) })
572+ // Create telemetry settings
573+ settings := tt .NewTelemetrySettings ()
574+
575+ recv := newGRPCReceiver (t , settings , addr , sink )
576+ require .NotNil (t , recv )
577+ require .NoError (t , recv .Start (context .Background (), componenttest .NewNopHost ()))
578+ t .Cleanup (func () { require .NoError (t , recv .Shutdown (context .Background ())) })
579+
580+ cc , err := grpc .NewClient (addr , grpc .WithTransportCredentials (insecure .NewCredentials ()))
581+ require .NoError (t , err )
582+ defer func () {
583+ assert .NoError (t , cc .Close ())
584+ }()
585+ // Set up the error case
586+ sink .SetConsumeError (errors .New ("consumer error" ))
587+
588+ md := testdata .GenerateMetrics (1 )
589+ _ , err = pmetricotlp .NewGRPCClient (cc ).Export (context .Background (), pmetricotlp .NewExportRequestFromMetrics (md ))
590+ errStatus , ok := status .FromError (err )
591+ require .True (t , ok )
592+ assert .Equal (t , codes .Unavailable , errStatus .Code ())
593+
594+ // Assert receiver metrics including receiver_requests
595+ assertReceiverMetrics (t , tt , otlpReceiverID , "grpc" , 0 , 2 )
596+ }
597+
556598// TestOTLPReceiverGRPCTracesIngestTest checks that the gRPC trace receiver
557599// is returning the proper response (return and metrics) when the next consumer
558600// in the pipeline reports error. The test changes the responses returned by the
@@ -1262,8 +1304,41 @@ func (esc *errOrSinkConsumer) checkData(t *testing.T, data any, dataLen int) {
12621304 }
12631305}
12641306
1265- func assertReceiverTraces (t * testing.T , tt * componenttest.Telemetry , id component.ID , transport string , accepted , refused int64 ) {
1266- got , err := tt .GetMetric ("otelcol_receiver_accepted_spans" )
1307+ func assertReceiverTraces (t * testing.T , tt * componenttest.Telemetry , id component.ID , transport string , accepted , rejected int64 ) {
1308+ var refused , failed int64
1309+ var outcome string
1310+ gateEnabled := receiverhelper .NewReceiverMetricsGate .IsEnabled ()
1311+ // The errors in the OTLP tests are not downstream, so they should be "failed" when the gate is enabled.
1312+ if gateEnabled {
1313+ failed = rejected
1314+ outcome = "failure"
1315+ } else {
1316+ // When the gate is disabled, all errors are "refused".
1317+ refused = rejected
1318+ }
1319+
1320+ got , err := tt .GetMetric ("otelcol_receiver_failed_spans" )
1321+ require .NoError (t , err )
1322+ metricdatatest .AssertEqual (t ,
1323+ metricdata.Metrics {
1324+ Name : "otelcol_receiver_failed_spans" ,
1325+ Description : "The number of spans that failed to be processed by the receiver due to internal errors. [alpha]" ,
1326+ Unit : "{spans}" ,
1327+ Data : metricdata.Sum [int64 ]{
1328+ Temporality : metricdata .CumulativeTemporality ,
1329+ IsMonotonic : true ,
1330+ DataPoints : []metricdata.DataPoint [int64 ]{
1331+ {
1332+ Attributes : attribute .NewSet (
1333+ attribute .String ("receiver" , id .String ()),
1334+ attribute .String ("transport" , transport )),
1335+ Value : failed ,
1336+ },
1337+ },
1338+ },
1339+ }, got , metricdatatest .IgnoreTimestamp (), metricdatatest .IgnoreExemplars ())
1340+
1341+ got , err = tt .GetMetric ("otelcol_receiver_accepted_spans" )
12671342 require .NoError (t , err )
12681343 metricdatatest .AssertEqual (t ,
12691344 metricdata.Metrics {
@@ -1304,4 +1379,165 @@ func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id componen
13041379 },
13051380 },
13061381 }, got , metricdatatest .IgnoreTimestamp (), metricdatatest .IgnoreExemplars ())
1382+
1383+ // Assert receiver_requests metric
1384+ if gateEnabled {
1385+ got , err := tt .GetMetric ("otelcol_receiver_requests" )
1386+ require .NoError (t , err )
1387+
1388+ // Calculate expected requests based on accepted and refused counts
1389+ var expectedRequests []metricdata.DataPoint [int64 ]
1390+ if accepted > 0 {
1391+ expectedRequests = append (expectedRequests , metricdata.DataPoint [int64 ]{
1392+ Attributes : attribute .NewSet (
1393+ attribute .String ("receiver" , id .String ()),
1394+ attribute .String ("transport" , transport ),
1395+ attribute .String ("outcome" , "success" )),
1396+ Value : accepted ,
1397+ })
1398+ }
1399+ if rejected > 0 {
1400+ expectedRequests = append (expectedRequests , metricdata.DataPoint [int64 ]{
1401+ Attributes : attribute .NewSet (
1402+ attribute .String ("receiver" , id .String ()),
1403+ attribute .String ("transport" , transport ),
1404+ attribute .String ("outcome" , outcome )),
1405+ Value : rejected ,
1406+ })
1407+ }
1408+
1409+ metricdatatest .AssertEqual (t ,
1410+ metricdata.Metrics {
1411+ Name : "otelcol_receiver_requests" ,
1412+ Description : "The number of requests performed." ,
1413+ Unit : "{requests}" ,
1414+ Data : metricdata.Sum [int64 ]{
1415+ Temporality : metricdata .CumulativeTemporality ,
1416+ IsMonotonic : true ,
1417+ DataPoints : expectedRequests ,
1418+ },
1419+ }, got , metricdatatest .IgnoreTimestamp (), metricdatatest .IgnoreExemplars ())
1420+ } else {
1421+ _ , err := tt .GetMetric ("otelcol_receiver_requests" )
1422+ require .Error (t , err )
1423+ }
1424+ }
1425+
1426+ func assertReceiverMetrics (t * testing.T , tt * componenttest.Telemetry , id component.ID , transport string , accepted , rejected int64 ) {
1427+ var refused , failed int64
1428+ var outcome string
1429+ gateEnabled := receiverhelper .NewReceiverMetricsGate .IsEnabled ()
1430+ // The error used in the metrics test is not downstream.
1431+ if gateEnabled {
1432+ failed = rejected
1433+ outcome = "failure"
1434+ } else {
1435+ // When the gate is disabled, all errors are "refused".
1436+ refused = rejected
1437+ }
1438+
1439+ got , err := tt .GetMetric ("otelcol_receiver_failed_metric_points" )
1440+ require .NoError (t , err )
1441+ metricdatatest .AssertEqual (t ,
1442+ metricdata.Metrics {
1443+ Name : "otelcol_receiver_failed_metric_points" ,
1444+ Description : "The number of metric points that failed to be processed by the receiver due to internal errors. [alpha]" ,
1445+ Unit : "{datapoints}" ,
1446+ Data : metricdata.Sum [int64 ]{
1447+ Temporality : metricdata .CumulativeTemporality ,
1448+ IsMonotonic : true ,
1449+ DataPoints : []metricdata.DataPoint [int64 ]{
1450+ {
1451+ Attributes : attribute .NewSet (
1452+ attribute .String ("receiver" , id .String ()),
1453+ attribute .String ("transport" , transport )),
1454+ Value : failed ,
1455+ },
1456+ },
1457+ },
1458+ }, got , metricdatatest .IgnoreTimestamp (), metricdatatest .IgnoreExemplars ())
1459+
1460+ got , err = tt .GetMetric ("otelcol_receiver_accepted_metric_points" )
1461+ require .NoError (t , err )
1462+ metricdatatest .AssertEqual (t ,
1463+ metricdata.Metrics {
1464+ Name : "otelcol_receiver_accepted_metric_points" ,
1465+ Description : "Number of metric points successfully pushed into the pipeline. [alpha]" ,
1466+ Unit : "{datapoints}" ,
1467+ Data : metricdata.Sum [int64 ]{
1468+ Temporality : metricdata .CumulativeTemporality ,
1469+ IsMonotonic : true ,
1470+ DataPoints : []metricdata.DataPoint [int64 ]{
1471+ {
1472+ Attributes : attribute .NewSet (
1473+ attribute .String ("receiver" , id .String ()),
1474+ attribute .String ("transport" , transport )),
1475+ Value : accepted ,
1476+ },
1477+ },
1478+ },
1479+ }, got , metricdatatest .IgnoreTimestamp (), metricdatatest .IgnoreExemplars ())
1480+
1481+ got , err = tt .GetMetric ("otelcol_receiver_refused_metric_points" )
1482+ require .NoError (t , err )
1483+ metricdatatest .AssertEqual (t ,
1484+ metricdata.Metrics {
1485+ Name : "otelcol_receiver_refused_metric_points" ,
1486+ Description : "Number of metric points that could not be pushed into the pipeline. [alpha]" ,
1487+ Unit : "{datapoints}" ,
1488+ Data : metricdata.Sum [int64 ]{
1489+ Temporality : metricdata .CumulativeTemporality ,
1490+ IsMonotonic : true ,
1491+ DataPoints : []metricdata.DataPoint [int64 ]{
1492+ {
1493+ Attributes : attribute .NewSet (
1494+ attribute .String ("receiver" , id .String ()),
1495+ attribute .String ("transport" , transport )),
1496+ Value : refused ,
1497+ },
1498+ },
1499+ },
1500+ }, got , metricdatatest .IgnoreTimestamp (), metricdatatest .IgnoreExemplars ())
1501+
1502+ // Assert receiver_requests metric
1503+ if gateEnabled {
1504+ got , err := tt .GetMetric ("otelcol_receiver_requests" )
1505+ require .NoError (t , err )
1506+
1507+ // Calculate expected requests based on accepted and refused counts
1508+ var expectedRequests []metricdata.DataPoint [int64 ]
1509+ if accepted > 0 {
1510+ expectedRequests = append (expectedRequests , metricdata.DataPoint [int64 ]{
1511+ Attributes : attribute .NewSet (
1512+ attribute .String ("receiver" , id .String ()),
1513+ attribute .String ("transport" , transport ),
1514+ attribute .String ("outcome" , "success" )),
1515+ Value : accepted ,
1516+ })
1517+ }
1518+ if rejected > 0 {
1519+ expectedRequests = append (expectedRequests , metricdata.DataPoint [int64 ]{
1520+ Attributes : attribute .NewSet (
1521+ attribute .String ("receiver" , id .String ()),
1522+ attribute .String ("transport" , transport ),
1523+ attribute .String ("outcome" , outcome )),
1524+ Value : 1 , // One request failed
1525+ })
1526+ }
1527+
1528+ metricdatatest .AssertEqual (t ,
1529+ metricdata.Metrics {
1530+ Name : "otelcol_receiver_requests" ,
1531+ Description : "The number of requests performed." ,
1532+ Unit : "{requests}" ,
1533+ Data : metricdata.Sum [int64 ]{
1534+ Temporality : metricdata .CumulativeTemporality ,
1535+ IsMonotonic : true ,
1536+ DataPoints : expectedRequests ,
1537+ },
1538+ }, got , metricdatatest .IgnoreTimestamp (), metricdatatest .IgnoreExemplars ())
1539+ } else {
1540+ _ , err := tt .GetMetric ("otelcol_receiver_requests" )
1541+ require .Error (t , err )
1542+ }
13071543}
0 commit comments