Skip to content

Commit 8635150

Browse files
author
shartung
committed
Align tryWrite follow-up with existing example-based repo structure
1 parent b9d52dc commit 8635150

7 files changed

Lines changed: 109 additions & 205 deletions

File tree

.github/workflows/cpp.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ jobs:
4848
run: |
4949
cd uWebSockets
5050
iwr https://deno.land/x/install/install.ps1 -useb | iex
51-
Start-Process -NoNewWindow .\SmokeTest.exe
51+
Start-Process -NoNewWindow .\Crc32
5252
sleep 1
5353
deno run --allow-net tests\smoke.mjs
54-
Stop-Process -Name SmokeTest
54+
Stop-Process -Name Crc32
5555
5656
build_linux:
5757

build.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ int main(int argc, char **argv) {
99
char *CXX = strncpy(calloc(1024, 1), or_else(getenv("CXX"), "g++"), 1024);
1010
char *EXEC_SUFFIX = strncpy(calloc(1024, 1), maybe(getenv("EXEC_SUFFIX")), 1024);
1111

12-
char *EXAMPLE_FILES[] = {"SecureGzipFileServer", "Precompress", "EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "SmokeTest",
13-
"ServerName", "EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"};
12+
char *EXAMPLE_FILES[] = {"SecureGzipFileServer", "Precompress", "EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName",
13+
"EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"};
1414

1515
strcat(CXXFLAGS, " -march=native -O3 -Wpedantic -Wall -Wextra -Wsign-conversion -Wconversion -std=c++2b -Isrc -IuSockets/src");
1616
strcat(LDFLAGS, " uSockets/*.o");

examples/ChunkedResponse.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#include "App.h"
2+
3+
/* This example demonstrates a large chunked response streamed with tryWrite. */
4+
5+
#include <cstdint>
6+
#include <memory>
7+
#include <string>
8+
9+
namespace {
10+
11+
const std::string payload(16 * 1024 * 1024, 'x');
12+
13+
struct ResponseState {
14+
uintmax_t baseOffset = 0;
15+
bool aborted = false;
16+
};
17+
18+
template <bool SSL>
19+
bool tryWriteLoop(uWS::HttpResponse<SSL> *res, ResponseState *state) {
20+
if (state->aborted) {
21+
return true;
22+
}
23+
24+
uintmax_t sent = res->getWriteOffset() - state->baseOffset;
25+
std::string_view remaining = payload;
26+
remaining.remove_prefix((size_t) sent);
27+
if (res->tryWrite(remaining)) {
28+
res->end();
29+
return true;
30+
}
31+
32+
return false;
33+
}
34+
35+
}
36+
37+
int main() {
38+
39+
uWS::SSLApp({
40+
.key_file_name = "misc/key.pem",
41+
.cert_file_name = "misc/cert.pem",
42+
.passphrase = "1234"
43+
}).get("/*", [](auto *res, auto */*req*/) {
44+
auto state = std::make_shared<ResponseState>();
45+
state->baseOffset = res->getWriteOffset();
46+
47+
res->writeHeader("Content-Type", "application/octet-stream");
48+
res->onAborted([state]() {
49+
state->aborted = true;
50+
});
51+
52+
if (!tryWriteLoop(res, state.get())) {
53+
res->onWritable([res, state](uintmax_t) {
54+
return tryWriteLoop(res, state.get());
55+
});
56+
}
57+
}).listen(3000, [](auto *listen_socket) {
58+
if (listen_socket) {
59+
std::cout << "Listening on port " << 3000 << std::endl;
60+
}
61+
}).run();
62+
63+
std::cout << "Failed to listen on port 3000" << std::endl;
64+
}

examples/Crc32.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) {
3131
}
3232

3333
int main() {
34+
3435
uWS::SSLApp({
3536
.key_file_name = "misc/key.pem",
3637
.cert_file_name = "misc/cert.pem",

examples/SmokeTest.cpp

Lines changed: 36 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -2,157 +2,55 @@
22

33
/* This is not an example; it is a smoke test used in CI testing */
44

5-
#include <cstddef>
6-
#include <cstdint>
7-
#include <memory>
8-
#include <sstream>
9-
#include <string>
10-
11-
namespace {
12-
13-
const std::string writeChunk(1024, 'a');
14-
const std::string tryWritePayload(16 * 1024 * 1024, 'x');
15-
const std::string tryWriteEndPayload(32 * 1024 * 1024, 'y');
16-
17-
struct WriteState {
18-
int remaining = 128;
19-
bool aborted = false;
5+
struct Stream {
6+
int offset;
7+
bool aborted;
208
};
219

22-
struct TryWriteState {
23-
const std::string *payload = nullptr;
24-
uintmax_t baseOffset = 0;
25-
bool aborted = false;
26-
};
27-
28-
uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) {
29-
30-
for (size_t i = 0; i < n; i++) {
31-
unsigned char ch = static_cast<unsigned char>(s[i]);
32-
for (size_t j = 0; j < 8; j++) {
33-
uint32_t b = (ch ^ crc) & 1;
34-
crc >>= 1;
35-
if (b) crc = crc ^ 0xEDB88320;
36-
ch >>= 1;
37-
}
38-
}
39-
40-
return crc;
10+
std::string constantChunk;
11+
12+
void streamData(auto *res, auto stream, int chunk) {
13+
14+
if (stream->aborted) {
15+
return;
16+
}
17+
18+
if (chunk < 1600) {
19+
res->cork([res, stream, chunk]() {
20+
auto ok = res->write(constantChunk);
21+
if (ok) {
22+
streamData(res, stream, chunk + 1);
23+
return;
24+
}
25+
26+
uWS::Loop::get()->defer([res, stream, chunk]() {
27+
streamData(res, stream, chunk + 1);
28+
});
29+
});
30+
} else {
31+
res->cork([res]() {
32+
res->end();
33+
});
34+
}
4135
}
4236

43-
template <bool SSL>
44-
bool writeLoop(uWS::HttpResponse<SSL> *res, const std::shared_ptr<WriteState> &state) {
45-
while (!state->aborted && state->remaining) {
46-
state->remaining--;
47-
if (!res->write(writeChunk)) {
48-
return false;
49-
}
50-
}
51-
52-
if (!state->aborted) {
53-
res->end();
54-
}
55-
56-
return true;
57-
}
58-
59-
template <bool SSL>
60-
bool tryWriteLoop(uWS::HttpResponse<SSL> *res, const std::shared_ptr<TryWriteState> &state) {
61-
if (state->aborted) {
62-
return true;
63-
}
37+
int main() {
6438

65-
uintmax_t sent = res->getWriteOffset() - state->baseOffset;
66-
std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent);
67-
if (res->tryWrite(remaining)) {
68-
res->end();
69-
return true;
39+
for (int i = 0; i < 65536; i++) {
40+
constantChunk.append("a", 1);
7041
}
7142

72-
return false;
73-
}
74-
75-
}
76-
77-
int main() {
7843
uWS::SSLApp({
7944
.key_file_name = "misc/key.pem",
8045
.cert_file_name = "misc/cert.pem",
8146
.passphrase = "1234"
82-
}).get("/write", [](auto *res, auto */*req*/) {
83-
auto state = std::make_shared<WriteState>();
84-
85-
res->onAborted([state]() {
86-
state->aborted = true;
87-
});
88-
89-
if (!writeLoop(res, state)) {
90-
res->onWritable([res, state](uintmax_t) {
91-
return writeLoop(res, state);
92-
});
93-
}
94-
}).get("/trywrite", [](auto *res, auto */*req*/) {
95-
auto state = std::make_shared<TryWriteState>();
96-
state->payload = &tryWritePayload;
97-
state->baseOffset = res->getWriteOffset();
98-
99-
res->onAborted([state]() {
100-
state->aborted = true;
101-
});
102-
103-
if (!tryWriteLoop(res, state)) {
104-
res->onWritable([res, state](uintmax_t) {
105-
return tryWriteLoop(res, state);
106-
});
107-
}
108-
}).get("/trywrite-end", [](auto *res, auto */*req*/) {
109-
auto state = std::make_shared<TryWriteState>();
110-
state->payload = &tryWriteEndPayload;
111-
state->baseOffset = res->getWriteOffset();
47+
}).get("/*", [](auto *res, auto */*req*/) {
11248

113-
res->onAborted([state]() {
114-
state->aborted = true;
115-
});
116-
117-
if (!res->tryWrite(*state->payload)) {
118-
res->onWritable([res, state](uintmax_t offset) {
119-
if (state->aborted) {
120-
return true;
121-
}
122-
123-
uintmax_t sent = offset - state->baseOffset;
124-
std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent);
125-
res->end(remaining);
126-
return true;
127-
});
128-
} else {
129-
res->end();
130-
}
131-
}).post("/*", [](auto *res, auto *req) {
132-
133-
auto isAborted = std::make_shared<bool>(false);
134-
uint32_t crc = 0xFFFFFFFF;
135-
136-
/* Display the headers */
137-
std::cout << " --- " << req->getUrl() << " --- " << std::endl;
138-
for (auto [key, value] : *req) {
139-
std::cout << key << ": " << value << std::endl;
140-
}
141-
142-
res->onData([res, isAborted, crc](std::string_view chunk, bool isFin) mutable {
143-
if (chunk.length()) {
144-
crc = crc32(chunk.data(), chunk.length(), crc);
145-
}
146-
147-
if (isFin && !*isAborted) {
148-
std::stringstream s;
149-
s << std::hex << (~crc) << std::endl;
150-
res->end(s.str());
151-
}
152-
});
49+
auto stream = std::make_shared<Stream>(0, false);
50+
streamData(res, stream, 0);
15351

154-
res->onAborted([isAborted]() {
155-
*isAborted = true;
52+
res->onAborted([stream]() {
53+
stream->aborted = true;
15654
});
15755
}).listen(3000, [](auto *listen_socket) {
15856
if (listen_socket) {

tests/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ performance:
1919
./HttpRouter
2020

2121
smoke:
22-
../SmokeTest &
22+
../Crc32 &
2323
sleep 1
2424
~/.deno/bin/deno run --allow-net smoke.mjs
2525
node smoke.mjs
26-
pkill SmokeTest
26+
pkill Crc32
2727

2828
compliance:
2929
../EchoBody &

tests/smoke.mjs

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* This smoke test runs against the dedicated SmokeTest program */
1+
/* This smoke test runs against the Crc32 example program for now, but this example will be extended for more tests */
22

33
var crc32 = (function () {
44
var table = new Uint32Array(256);
@@ -66,61 +66,6 @@ async function fixedCrc32Test(array) {
6666
}
6767
}
6868

69-
async function readBodySlowly(response) {
70-
const reader = response.body.getReader();
71-
const chunks = [];
72-
let total = 0;
73-
74-
while (true) {
75-
const {done, value} = await reader.read();
76-
if (done) {
77-
break;
78-
}
79-
80-
chunks.push(value);
81-
total += value.length;
82-
await new Promise((resolve) => setTimeout(resolve, 5));
83-
}
84-
85-
const body = new Uint8Array(total);
86-
let offset = 0;
87-
for (const chunk of chunks) {
88-
body.set(chunk, offset);
89-
offset += chunk.length;
90-
}
91-
return body;
92-
}
93-
94-
function expectFilled(body, size, value, label) {
95-
if (body.length !== size) {
96-
throw new Error(label + " failed: expected body size " + size + ", got " + body.length);
97-
}
98-
99-
for (let i = 0; i < body.length; i++) {
100-
if (body[i] !== value) {
101-
throw new Error(label + " failed: unexpected byte at offset " + i);
102-
}
103-
}
104-
}
105-
106-
async function streamingWriteTest() {
107-
console.log("Making streaming write request");
108-
const res = await fetch("http://localhost:3000/write");
109-
expectFilled(await readBodySlowly(res), 128 * 1024, "a".charCodeAt(0), "write");
110-
}
111-
112-
async function streamingTryWriteTest() {
113-
console.log("Making tryWrite request");
114-
const res = await fetch("http://localhost:3000/trywrite");
115-
expectFilled(await readBodySlowly(res), 16 * 1024 * 1024, "x".charCodeAt(0), "tryWrite");
116-
}
117-
118-
async function streamingTryWriteEndTest() {
119-
console.log("Making tryWrite-end request");
120-
const res = await fetch("http://localhost:3000/trywrite-end");
121-
expectFilled(await readBodySlowly(res), 32 * 1024 * 1024, "y".charCodeAt(0), "tryWrite-end");
122-
}
123-
12469
/* Maximum chunk size is less than 256mb */
12570
const sizes = [0, 0, 32, 32, 128, 256, 1024, 65536, 1024 * 1024, 1024 * 1024 * 128, 0, 0, 32, 32];
12671
for (let i = 0; i < sizes.length; i++) {
@@ -138,8 +83,4 @@ for (let i = 0; i < sizes.length; i++) {
13883
await chunkedCrc32Test(array);
13984
}
14085

141-
await streamingWriteTest();
142-
await streamingTryWriteTest();
143-
await streamingTryWriteEndTest();
144-
145-
console.log("Done!");
86+
console.log("Done!");

0 commit comments

Comments
 (0)