3737import java .util .concurrent .CompletableFuture ;
3838import java .util .concurrent .CountDownLatch ;
3939import java .util .concurrent .TimeUnit ;
40+ import java .util .concurrent .atomic .AtomicBoolean ;
4041import java .util .concurrent .atomic .AtomicReference ;
4142
4243import org .apache .hc .client5 .http .websocket .api .WebSocket ;
5152import org .eclipse .jetty .servlet .ServletHolder ;
5253import org .eclipse .jetty .websocket .api .Session ;
5354import org .eclipse .jetty .websocket .api .WebSocketAdapter ;
55+ import org .eclipse .jetty .websocket .api .extensions .ExtensionConfig ;
5456import org .eclipse .jetty .websocket .servlet .WebSocketServlet ;
5557import org .eclipse .jetty .websocket .servlet .WebSocketServletFactory ;
5658import org .junit .jupiter .api .AfterEach ;
@@ -62,6 +64,7 @@ final class WebSocketClientTest {
6264
6365 private Server server ;
6466 private int port ;
67+ private final AtomicBoolean pmceNegotiated = new AtomicBoolean (false );
6568
6669 @ BeforeEach
6770 void startServer () throws Exception {
@@ -73,6 +76,7 @@ void startServer() throws Exception {
7376 final ServletContextHandler ctx = new ServletContextHandler ();
7477 ctx .setContextPath ("/" );
7578 ctx .addServlet (new ServletHolder (new EchoServlet ()), "/echo" );
79+ ctx .addServlet (new ServletHolder (new PmceServlet (pmceNegotiated )), "/pmce" );
7680 ctx .addServlet (new ServletHolder (new InterleaveServlet ()), "/interleave" );
7781 ctx .addServlet (new ServletHolder (new AbruptServlet ()), "/abrupt" );
7882 ctx .addServlet (new ServletHolder (new TooBigServlet ()), "/too-big" );
@@ -187,6 +191,73 @@ private String buildPayload() {
187191 }
188192 }
189193
194+ @ Test
195+ void echo_compressed_pmce () throws Exception {
196+ final URI uri = uri (port , "/pmce" );
197+
198+ final WebSocketClientConfig cfg = WebSocketClientConfig .custom ()
199+ .enablePerMessageDeflate (true )
200+ .offerServerNoContextTakeover (true )
201+ .offerClientNoContextTakeover (true )
202+ .offerClientMaxWindowBits (15 )
203+ .build ();
204+
205+ try (final CloseableWebSocketClient client = WebSocketClients .createDefault ()) {
206+ client .start ();
207+
208+ final CountDownLatch done = new CountDownLatch (1 );
209+ final AtomicReference <Throwable > errorRef = new AtomicReference <>();
210+ final StringBuilder echoed = new StringBuilder ();
211+ final AtomicReference <WebSocket > wsRef = new AtomicReference <>();
212+
213+ final WebSocketListener listener = new WebSocketListener () {
214+
215+ @ Override
216+ public void onOpen (final WebSocket ws ) {
217+ wsRef .set (ws );
218+ final String payload = "pmce test " + Instant .now ();
219+ ws .sendText (payload , true );
220+ }
221+
222+ @ Override
223+ public void onText (final CharBuffer text , final boolean last ) {
224+ echoed .append (text );
225+ if (last ) {
226+ final WebSocket ws = wsRef .get ();
227+ if (ws != null ) {
228+ ws .close (1000 , "done" );
229+ }
230+ }
231+ }
232+
233+ @ Override
234+ public void onClose (final int code , final String reason ) {
235+ try {
236+ assertEquals (1000 , code );
237+ assertTrue (pmceNegotiated .get (), "PMCE not negotiated on server" );
238+ assertTrue (echoed .length () > 0 , "No text echoed back" );
239+ } finally {
240+ done .countDown ();
241+ }
242+ }
243+
244+ @ Override
245+ public void onError (final Throwable ex ) {
246+ errorRef .set (ex );
247+ done .countDown ();
248+ }
249+ };
250+
251+ client .connect (uri , listener , cfg , null );
252+ assertTrue (done .await (10 , TimeUnit .SECONDS ), "WebSocket did not close in time" );
253+
254+ final Throwable error = errorRef .get ();
255+ if (error != null ) {
256+ Assertions .fail ("WebSocket error: " + error .getMessage (), error );
257+ }
258+ }
259+ }
260+
190261 @ Test
191262 void ping_interleaved_fragmentation () throws Exception {
192263 final CountDownLatch gotText = new CountDownLatch (1 );
@@ -199,12 +270,9 @@ void ping_interleaved_fragmentation() throws Exception {
199270
200271 final URI u = uri (port , "/interleave" );
201272 client .connect (u , new WebSocketListener () {
202- private WebSocket ws ;
203-
204273 @ Override
205274 public void onOpen (final WebSocket ws ) {
206275 ws .ping (null );
207- this .ws = ws ;
208276 final String prefix = "hello from hc5 WS @ " + Instant .now () + " — " ;
209277 final StringBuilder sb = new StringBuilder ();
210278 for (int i = 0 ; i < 256 ; i ++) {
@@ -339,6 +407,55 @@ public void onWebSocketText(final String msg) {
339407 }
340408 }
341409
410+ public static final class PmceServlet extends WebSocketServlet {
411+ private final AtomicBoolean negotiated ;
412+
413+ public PmceServlet (final AtomicBoolean negotiated ) {
414+ this .negotiated = negotiated ;
415+ }
416+
417+ @ Override
418+ public void configure (final WebSocketServletFactory factory ) {
419+ factory .getPolicy ().setIdleTimeout (30000 );
420+ factory .setCreator ((req , resp ) -> new PmceSocket (negotiated ));
421+ }
422+ }
423+
424+ public static final class PmceSocket extends WebSocketAdapter {
425+ private final AtomicBoolean negotiated ;
426+
427+ public PmceSocket (final AtomicBoolean negotiated ) {
428+ this .negotiated = negotiated ;
429+ }
430+
431+ @ Override
432+ public void onWebSocketConnect (final Session sess ) {
433+ super .onWebSocketConnect (sess );
434+ if (sess != null ) {
435+ final java .util .List <ExtensionConfig > exts = sess .getUpgradeRequest ().getExtensions ();
436+ boolean hasPmce = false ;
437+ if (exts != null ) {
438+ for (final ExtensionConfig ext : exts ) {
439+ if ("permessage-deflate" .equalsIgnoreCase (ext .getName ())) {
440+ hasPmce = true ;
441+ break ;
442+ }
443+ }
444+ }
445+ negotiated .set (hasPmce );
446+ }
447+ }
448+
449+ @ Override
450+ public void onWebSocketText (final String msg ) {
451+ final Session s = getSession ();
452+ if (s != null && s .isOpen ()) {
453+ s .getRemote ().sendString (msg , null );
454+ s .close (1000 , "done" );
455+ }
456+ }
457+ }
458+
342459 public static final class InterleaveServlet extends WebSocketServlet {
343460 @ Override
344461 public void configure (final WebSocketServletFactory factory ) {
0 commit comments