Skip to content

Commit ee04e4a

Browse files
authored
[CXF-9189]Exceptions Not Logged When Using SseEventSink in JAX-RS SSE… (#2919)
* [CXF-9189]Exceptions Not Logged When Using SseEventSink in JAX-RS SSE Endpoint * [CXF-9189]use in memory log appender for the testcase
1 parent 3849477 commit ee04e4a

File tree

5 files changed

+147
-1
lines changed

5 files changed

+147
-1
lines changed

rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.CancellationException;
3030
import java.util.concurrent.CompletionException;
3131
import java.util.concurrent.CompletionStage;
32+
import java.util.logging.Level;
3233
import java.util.logging.Logger;
3334

3435
import jakarta.ws.rs.WebApplicationException;
@@ -38,6 +39,7 @@
3839
import jakarta.ws.rs.core.MediaType;
3940
import jakarta.ws.rs.core.MultivaluedMap;
4041
import jakarta.ws.rs.core.Response;
42+
import jakarta.ws.rs.sse.SseEventSink;
4143
import org.apache.cxf.common.classloader.ClassLoaderUtils;
4244
import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder;
4345
import org.apache.cxf.common.i18n.BundleUtils;
@@ -144,6 +146,21 @@ private Object handleAsyncFault(Exchange exchange, AsyncResponseImpl ar, Throwab
144146
try {
145147
return handleFault(new Fault(t), exchange.getInMessage(), null, null);
146148
} catch (Fault ex) {
149+
//If we got here, the fault was effectively "unmapped" (no Response could be created)
150+
// and we'd otherwise lose the usual fault logging (common with SSE sink / async paths).
151+
Throwable cause = ex.getCause() == null ? ex : ex.getCause();
152+
LOG.log(Level.SEVERE, "Unhandled exception from JAX-RS invocation (async/SSE path)", cause);
153+
154+
// Best-effort: if this request is SSE, close the sink so the container can complete cleanly.
155+
try {
156+
SseEventSink sink = (SseEventSink)exchange.getInMessage().get(SseEventSink.class);
157+
if (sink != null && !sink.isClosed()) {
158+
sink.close();
159+
}
160+
} catch (Exception ignore) {
161+
// ignore secondary failures; primary goal is to log original cause
162+
}
163+
147164
ar.setUnmappedThrowable(ex.getCause() == null ? ex : ex.getCause());
148165
if (isSuspended(exchange)) {
149166
ar.reset();

systests/rs-sse/rs-sse-base/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,9 @@
7575
<artifactId>awaitility</artifactId>
7676
<scope>compile</scope>
7777
</dependency>
78+
<dependency>
79+
<groupId>ch.qos.logback</groupId>
80+
<artifactId>logback-classic</artifactId>
81+
</dependency>
7882
</dependencies>
7983
</project>

systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.function.Consumer;
3434

35-
35+
import ch.qos.logback.classic.Level;
36+
import ch.qos.logback.classic.Logger;
37+
import ch.qos.logback.classic.spi.ILoggingEvent;
38+
import ch.qos.logback.classic.spi.IThrowableProxy;
39+
import ch.qos.logback.core.read.ListAppender;
3640
import jakarta.ws.rs.client.Entity;
3741
import jakarta.ws.rs.client.WebTarget;
3842
import jakarta.ws.rs.core.HttpHeaders;
@@ -43,9 +47,11 @@
4347
import jakarta.ws.rs.sse.InboundSseEvent;
4448
import jakarta.ws.rs.sse.SseEventSource;
4549
import jakarta.ws.rs.sse.SseEventSource.Builder;
50+
import org.slf4j.LoggerFactory;
4651
import tools.jackson.core.JacksonException;
4752
import tools.jackson.jakarta.rs.json.JacksonJsonProvider;
4853

54+
4955
import org.junit.Before;
5056
import org.junit.Test;
5157

@@ -60,6 +66,10 @@
6066
import static org.junit.Assert.fail;
6167

6268

69+
70+
71+
72+
6373
public abstract class AbstractSseTest extends AbstractSseBaseTest {
6474
@Before
6575
public void setUp() {
@@ -70,6 +80,9 @@ public void setUp() {
7080

7181
}
7282

83+
84+
85+
7386
@Test
7487
public void testBooksStreamIsReturnedFromLastEventId() throws InterruptedException {
7588
final WebTarget target = createWebTarget("/rest/api/bookstore/sse/" + UUID.randomUUID())
@@ -408,7 +421,98 @@ public void testBooksSseContainerResponseAddedHeaders() throws InterruptedExcept
408421
assertThat(response.getHeaderString("X-My-ProtocolHeader"), equalTo("protocol-headers"));
409422
}
410423
}
424+
425+
426+
@Test
427+
public void testSseEndpointExceptionIsLoggedToConsole() throws Exception {
428+
final Logger logger = (Logger) LoggerFactory.getLogger("org.apache.cxf.jaxrs.JAXRSInvoker");
429+
430+
final Level oldLevel = logger.getLevel();
431+
final ListAppender<ILoggingEvent> appender = new ListAppender<>();
432+
appender.start();
433+
434+
try {
435+
logger.setLevel(Level.ERROR);
436+
logger.addAppender(appender);
437+
438+
try (Response r = createWebTarget("/rest/api/bookstore/sse/fail/request")
439+
.request(MediaType.SERVER_SENT_EVENTS)
440+
.get()) {
441+
// Force the client to actually start consuming
442+
r.readEntity(String.class);
443+
} catch (Exception ex) {
444+
// expected
445+
}
446+
447+
// Wait until we have at least one ERROR from JAXRSInvoker
448+
awaitEvents(2000, appender.list, 1);
449+
450+
assertTrue("Expected SSE log event, got:\n" + dump(appender),
451+
hasUnhandledExceptionEvent(appender));
452+
453+
assertTrue("Expected SSE marker in throwable, got:\n" + dump(appender),
454+
hasMarkerInUnhandledExceptionEvent(appender, "CXF-9189-MARKER"));
455+
} finally {
456+
logger.detachAppender(appender);
457+
logger.setLevel(oldLevel);
458+
appender.stop();
459+
}
460+
}
411461

462+
private static boolean hasUnhandledExceptionEvent(ListAppender<ILoggingEvent> appender) {
463+
final String msgNeedle = "Unhandled exception from JAX-RS invocation (async/SSE path)";
464+
for (ILoggingEvent e : appender.list) {
465+
String msg = e.getFormattedMessage();
466+
if (msg != null && msg.contains(msgNeedle)) {
467+
return true;
468+
}
469+
}
470+
return false;
471+
}
472+
473+
private static boolean hasMarkerInUnhandledExceptionEvent(ListAppender<ILoggingEvent> appender, String marker) {
474+
final String msgNeedle = "Unhandled exception from JAX-RS invocation (async/SSE path)";
475+
for (ILoggingEvent e : appender.list) {
476+
String msg = e.getFormattedMessage();
477+
if (msg == null || !msg.contains(msgNeedle)) {
478+
continue;
479+
}
480+
// marker can be in message OR in throwable chain
481+
if (msg.contains(marker) || throwableChainContains(e.getThrowableProxy(), marker)) {
482+
return true;
483+
}
484+
}
485+
return false;
486+
}
487+
488+
private static boolean throwableChainContains(IThrowableProxy tp, String needle) {
489+
for (IThrowableProxy cur = tp; cur != null; cur = cur.getCause()) {
490+
String m = cur.getMessage();
491+
if (m != null && m.contains(needle)) {
492+
return true;
493+
}
494+
}
495+
return false;
496+
}
497+
498+
private static String dump(ListAppender<ILoggingEvent> appender) {
499+
StringBuilder sb = new StringBuilder();
500+
for (ILoggingEvent e : appender.list) {
501+
sb.append('[').append(e.getLevel()).append("] ")
502+
.append(e.getLoggerName()).append(" - ")
503+
.append(e.getFormattedMessage());
504+
if (e.getThrowableProxy() != null) {
505+
sb.append(" (thrown: ")
506+
.append(e.getThrowableProxy().getClassName())
507+
.append(": ")
508+
.append(e.getThrowableProxy().getMessage())
509+
.append(')');
510+
}
511+
sb.append('\n');
512+
}
513+
return sb.toString();
514+
}
515+
412516
/**
413517
* Jetty / Undertow do not propagate errors from the runnable passed to
414518
* AsyncContext::start() up to the AsyncEventListener::onError(). Tomcat however
@@ -426,4 +530,6 @@ private static Consumer<InboundSseEvent> collect(final Collection<Book> books) {
426530
private static Consumer<InboundSseEvent> collectRaw(final Collection<String> titles) {
427531
return event -> titles.add(event.readData(String.class, MediaType.TEXT_PLAIN_TYPE));
428532
}
533+
534+
429535
}

systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,15 @@ public int filteredStats() {
258258
return BookStoreResponseFilter.getInvocations();
259259
}
260260

261+
@GET
262+
@Path("/sse/fail/request")
263+
@Produces(MediaType.SERVER_SENT_EVENTS)
264+
public void failOnRequestThread(@Context SseEventSink sink) {
265+
throw new RuntimeException("CXF-9189-MARKER: exception from SSE resource method should be logged");
266+
}
267+
268+
269+
261270
@PUT
262271
@Path("/filtered/stats")
263272
public void clearStats() {

systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ public void run() {
238238
public int filteredStats() {
239239
return BookStoreResponseFilter.getInvocations();
240240
}
241+
242+
@GET
243+
@Path("/sse/fail/request")
244+
@Produces(MediaType.SERVER_SENT_EVENTS)
245+
public void failOnRequestThread(@Context SseEventSink sink) {
246+
throw new RuntimeException("CXF-9189-MARKER: exception from SSE resource method should be logged");
247+
}
248+
249+
250+
241251

242252
@PUT
243253
@Path("/filtered/stats")

0 commit comments

Comments
 (0)