Skip to content

Commit bc3efcd

Browse files
committed
Add full WebSocket implementation with RFC 6455/7692 framing, close handshake, UTF‑8 validation,
and RFC 8441 Extended CONNECT for HTTP/2, plus tests and examples.
1 parent 86b7b5e commit bc3efcd

File tree

71 files changed

+4970
-211
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+4970
-211
lines changed

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/websocket/H2WebSocketEchoIT.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,4 +159,64 @@ public void onError(final Throwable ex) {
159159
client.awaitShutdown(TimeValue.ofSeconds(2));
160160
}
161161
}
162+
163+
@Test
164+
void echoesOverHttp2ExtendedConnectWithPmce() throws Exception {
165+
final URI uri = URI.create("ws://localhost:" + server.getLocalPort() + "/echo");
166+
final CountDownLatch done = new CountDownLatch(1);
167+
final AtomicReference<String> echo = new AtomicReference<>();
168+
169+
final WebSocketClientConfig cfg = WebSocketClientConfig.custom()
170+
.enableHttp2(true)
171+
.enablePerMessageDeflate(true)
172+
.offerClientMaxWindowBits(15)
173+
.setCloseWaitTimeout(Timeout.ofSeconds(2))
174+
.build();
175+
176+
try (final CloseableWebSocketClient client = WebSocketClientBuilder.create()
177+
.defaultConfig(cfg)
178+
.build()) {
179+
180+
client.start();
181+
client.connect(uri, new WebSocketListener() {
182+
private WebSocket ws;
183+
184+
@Override
185+
public void onOpen(final WebSocket ws) {
186+
this.ws = ws;
187+
ws.sendText("hello-h2-pmce hello-h2-pmce hello-h2-pmce", true);
188+
}
189+
190+
@Override
191+
public void onText(final CharBuffer text, final boolean last) {
192+
echo.set(text.toString());
193+
done.countDown();
194+
ws.close(1000, "done");
195+
}
196+
197+
@Override
198+
public void onClose(final int code, final String reason) {
199+
}
200+
201+
@Override
202+
public void onError(final Throwable ex) {
203+
if (!(ex instanceof ConnectionClosedException)) {
204+
ex.printStackTrace(System.err);
205+
}
206+
done.countDown();
207+
}
208+
}, cfg).exceptionally(ex -> {
209+
if (!(ex instanceof ConnectionClosedException)) {
210+
ex.printStackTrace(System.err);
211+
}
212+
done.countDown();
213+
return null;
214+
});
215+
216+
assertTrue(done.await(10, TimeUnit.SECONDS), "timed out waiting for echo");
217+
assertEquals("hello-h2-pmce hello-h2-pmce hello-h2-pmce", echo.get());
218+
client.initiateShutdown();
219+
client.awaitShutdown(TimeValue.ofSeconds(2));
220+
}
221+
}
162222
}

httpclient5-websocket/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@
4949
<groupId>org.apache.httpcomponents.client5</groupId>
5050
<artifactId>httpclient5-cache</artifactId>
5151
</dependency>
52-
<dependency>
53-
<groupId>org.apache.httpcomponents.core5</groupId>
54-
<artifactId>httpcore5-websocket</artifactId>
55-
</dependency>
5652
<dependency>
5753
<groupId>org.apache.httpcomponents.core5</groupId>
5854
<artifactId>httpcore5-h2</artifactId>

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/api/WebSocketClientConfig.java

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@
3838
*
3939
* <p>Instances are normally created via the associated builder. The
4040
* configuration controls timeouts, maximum frame and message sizes,
41-
* fragmentation behaviour, buffer pooling and optional automatic
42-
* responses to PING frames.</p>
41+
* fragmentation behaviour and optional automatic responses to PING frames.</p>
4342
*
4443
* <p>Unless explicitly overridden, reasonable defaults are selected for
4544
* desktop and server environments. For mobile or memory-constrained
@@ -64,8 +63,7 @@ public final class WebSocketClientConfig {
6463
private final int outgoingChunkSize;
6564
private final int maxFramesPerTick;
6665

67-
// Buffers / pool
68-
private final int ioPoolCapacity;
66+
// Buffers
6967
private final boolean directBuffers;
7068

7169
// Behavior
@@ -88,7 +86,6 @@ private WebSocketClientConfig(
8886
final int maxFrameSize,
8987
final int outgoingChunkSize,
9088
final int maxFramesPerTick,
91-
final int ioPoolCapacity,
9289
final boolean directBuffers,
9390
final boolean autoPong,
9491
final Timeout closeWaitTimeout,
@@ -108,7 +105,6 @@ private WebSocketClientConfig(
108105
this.maxFrameSize = maxFrameSize;
109106
this.outgoingChunkSize = outgoingChunkSize;
110107
this.maxFramesPerTick = maxFramesPerTick;
111-
this.ioPoolCapacity = ioPoolCapacity;
112108
this.directBuffers = directBuffers;
113109
this.autoPong = autoPong;
114110
this.closeWaitTimeout = Args.notNull(closeWaitTimeout, "closeWaitTimeout");
@@ -235,17 +231,7 @@ public int getMaxFramesPerTick() {
235231
}
236232

237233
/**
238-
* Capacity of the internal buffer pool used by WebSocket I/O.
239-
*
240-
* @return pool capacity (must be &gt; 0)
241-
* @since 5.7
242-
*/
243-
public int getIoPoolCapacity() {
244-
return ioPoolCapacity;
245-
}
246-
247-
/**
248-
* Whether direct byte buffers are preferred for the internal buffer pool.
234+
* Whether direct byte buffers are preferred for internal I/O.
249235
*
250236
* @return {@code true} for direct buffers, {@code false} for heap buffers
251237
* @since 5.7
@@ -337,7 +323,6 @@ public static final class Builder {
337323
private int outgoingChunkSize = 8 * 1024;
338324
private int maxFramesPerTick = 1024;
339325

340-
private int ioPoolCapacity = 64;
341326
private boolean directBuffers = true;
342327

343328
private boolean autoPong = true;
@@ -474,19 +459,7 @@ public Builder setMaxFramesPerTick(final int v) {
474459
}
475460

476461
/**
477-
* Sets the capacity of the internal buffer pool.
478-
*
479-
* @param v pool capacity (must be &gt; 0)
480-
* @return this builder
481-
* @since 5.7
482-
*/
483-
public Builder setIoPoolCapacity(final int v) {
484-
this.ioPoolCapacity = v;
485-
return this;
486-
}
487-
488-
/**
489-
* Enables or disables the use of direct buffers for the internal pool.
462+
* Enables or disables the use of direct buffers for internal I/O.
490463
*
491464
* @param v {@code true} for direct buffers, {@code false} for heap buffers
492465
* @return this builder
@@ -583,9 +556,6 @@ public WebSocketClientConfig build() {
583556
if (maxFramesPerTick <= 0) {
584557
throw new IllegalArgumentException("maxFramesPerTick > 0");
585558
}
586-
if (ioPoolCapacity <= 0) {
587-
throw new IllegalArgumentException("ioPoolCapacity > 0");
588-
}
589559
if (maxMessageSize <= 0) {
590560
throw new IllegalArgumentException("maxMessageSize > 0");
591561
}
@@ -597,7 +567,7 @@ public WebSocketClientConfig build() {
597567
perMessageDeflateEnabled, offerServerNoContextTakeover, offerClientNoContextTakeover,
598568
offerClientMaxWindowBits, offerServerMaxWindowBits,
599569
maxFrameSize, outgoingChunkSize, maxFramesPerTick,
600-
ioPoolCapacity, directBuffers,
570+
directBuffers,
601571
autoPong, closeWaitTimeout, maxMessageSize,
602572
maxOutboundControlQueue,
603573
http2Enabled

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/client/WebSocketClientBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public final class WebSocketClientBuilder {
108108
// Optional listeners for reactor metrics and worker selection.
109109
private IOReactorMetricsListener reactorMetricsListener;
110110
private IOWorkerSelector workerSelector;
111+
private int maxPendingCommandsPerConnection;
111112

112113
private WebSocketClientConfig defaultConfig = WebSocketClientConfig.custom().build();
113114

@@ -259,6 +260,18 @@ public WebSocketClientBuilder setPoolConcurrencyPolicy(final PoolConcurrencyPoli
259260
return this;
260261
}
261262

263+
/**
264+
* Sets the maximum number of pending commands per connection.
265+
*
266+
* @param maxPendingCommandsPerConnection maximum pending commands; values &lt; 0
267+
* cause the default of {@code 0} to be used.
268+
* @return this builder.
269+
*/
270+
public WebSocketClientBuilder setMaxPendingCommandsPerConnection(final int maxPendingCommandsPerConnection) {
271+
this.maxPendingCommandsPerConnection = maxPendingCommandsPerConnection;
272+
return this;
273+
}
274+
262275
/**
263276
* Sets the TLS strategy used to establish HTTPS or WSS connections.
264277
*
@@ -438,7 +451,7 @@ public CloseableWebSocketClient build() {
438451
handshakeTimeout,
439452
metricsListener,
440453
selector,
441-
0
454+
Math.max(maxPendingCommandsPerConnection, 0)
442455
);
443456

444457
final H2MultiplexingRequester h2Requester = H2MultiplexingRequesterBootstrap.bootstrap()

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketInbound.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ void onDisconnected(final IOSession ioSession) {
8787
}
8888
}
8989
if (s.readBuf != null) {
90-
s.bufferPool.release(s.readBuf);
9190
s.readBuf = null;
9291
}
9392
out.drainAndRelease();
@@ -101,7 +100,9 @@ void onInputReady(final IOSession ioSession, final ByteBuffer src) {
101100
}
102101

103102
if (s.readBuf == null) {
104-
s.readBuf = s.bufferPool.acquire();
103+
s.readBuf = s.cfg.isDirectBuffers()
104+
? ByteBuffer.allocateDirect(s.readBuf.capacity())
105+
: ByteBuffer.allocate(s.readBuf.capacity());
105106
if (s.readBuf == null) {
106107
return;
107108
}
@@ -115,7 +116,9 @@ void onInputReady(final IOSession ioSession, final ByteBuffer src) {
115116
do {
116117
ByteBuffer rb = s.readBuf;
117118
if (rb == null) {
118-
rb = s.bufferPool.acquire();
119+
rb = s.cfg.isDirectBuffers()
120+
? ByteBuffer.allocateDirect(s.readBuf.capacity())
121+
: ByteBuffer.allocate(s.readBuf.capacity());
119122
if (rb == null) {
120123
return;
121124
}

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketOutbound.java

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,7 @@ void onOutputReady(final IOSession ioSession) {
134134
}
135135

136136
private void release(final OutFrame frame) {
137-
if (frame.pooled) {
138-
s.bufferPool.release(frame.buf);
139-
}
137+
// No-op: buffers are not pooled.
140138
}
141139

142140
boolean enqueueCtrl(final OutFrame frame) {
@@ -201,15 +199,10 @@ OutFrame pooledFrame(final int opcode, final ByteBuffer payload, final boolean f
201199

202200
final int totalSize = headerEstimate + len;
203201

204-
final ByteBuffer buf;
205-
final boolean pooled;
206-
if (totalSize <= s.bufferPool.getBufferSize()) {
207-
buf = s.bufferPool.acquire();
208-
pooled = true;
209-
} else {
210-
buf = ByteBuffer.allocate(totalSize);
211-
pooled = false;
212-
}
202+
final ByteBuffer buf = s.cfg.isDirectBuffers()
203+
? ByteBuffer.allocateDirect(totalSize)
204+
: ByteBuffer.allocate(totalSize);
205+
final boolean pooled = false;
213206

214207
buf.clear();
215208
// opcode (int), payload (ByteBuffer), fin (boolean), mask (boolean), out (ByteBuffer)
@@ -235,15 +228,10 @@ OutFrame pooledFrameWithRsv(final int opcode, final ByteBuffer payload, final bo
235228

236229
final int totalSize = headerEstimate + len;
237230

238-
final ByteBuffer buf;
239-
final boolean pooled;
240-
if (totalSize <= s.bufferPool.getBufferSize()) {
241-
buf = s.bufferPool.acquire();
242-
pooled = true;
243-
} else {
244-
buf = ByteBuffer.allocate(totalSize);
245-
pooled = false;
246-
}
231+
final ByteBuffer buf = s.cfg.isDirectBuffers()
232+
? ByteBuffer.allocateDirect(totalSize)
233+
: ByteBuffer.allocate(totalSize);
234+
final boolean pooled = false;
247235

248236
buf.clear();
249237
s.writer.frameIntoWithRSV(opcode, ro, fin, true, rsvBits, buf);
@@ -268,15 +256,10 @@ OutFrame pooledCloseEcho(final ByteBuffer payload) {
268256

269257
final int totalSize = headerEstimate + len;
270258

271-
final ByteBuffer buf;
272-
final boolean pooled;
273-
if (totalSize <= s.bufferPool.getBufferSize()) {
274-
buf = s.bufferPool.acquire();
275-
pooled = true;
276-
} else {
277-
buf = ByteBuffer.allocate(totalSize);
278-
pooled = false;
279-
}
259+
final ByteBuffer buf = s.cfg.isDirectBuffers()
260+
? ByteBuffer.allocateDirect(totalSize)
261+
: ByteBuffer.allocate(totalSize);
262+
final boolean pooled = false;
280263

281264
buf.clear();
282265
s.writer.frameInto(FrameOpcode.CLOSE, ro, true, true, buf);

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketSessionState.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.hc.client5.http.websocket.api.WebSocketListener;
3737
import org.apache.hc.core5.websocket.extension.ExtensionChain;
3838
import org.apache.hc.core5.websocket.frame.WebSocketFrameWriter;
39-
import org.apache.hc.core5.websocket.util.ByteBufferPool;
4039
import org.apache.hc.core5.annotation.Internal;
4140
import org.apache.hc.core5.reactor.ProtocolIOSession;
4241

@@ -57,7 +56,6 @@ final class WebSocketSessionState {
5756
final int rsvMask;
5857

5958
// Buffers & codec
60-
final ByteBufferPool bufferPool;
6159
final WebSocketFrameWriter writer = new WebSocketFrameWriter();
6260
final WebSocketFrameDecoder decoder;
6361

@@ -112,10 +110,6 @@ final class WebSocketSessionState {
112110
}
113111

114112
final int poolBufSize = Math.max(8192, this.outChunk);
115-
final int poolCapacity = Math.max(16, cfg.getIoPoolCapacity());
116-
this.bufferPool = new ByteBufferPool(poolBufSize, poolCapacity, cfg.isDirectBuffers());
117-
118-
// Borrow one read buffer upfront
119-
this.readBuf = bufferPool.acquire();
113+
this.readBuf = cfg.isDirectBuffers() ? ByteBuffer.allocateDirect(poolBufSize) : ByteBuffer.allocate(poolBufSize);
120114
}
121115
}

0 commit comments

Comments
 (0)