Skip to content

Commit b9ed825

Browse files
authored
feat: stringify entries lazily, deduplicate chunks on write (#267)
1 parent 9334598 commit b9ed825

File tree

5 files changed

+176
-59
lines changed

5 files changed

+176
-59
lines changed

package-lock.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"@types/fs-extra": "^9.0.13",
4646
"@types/jest": "^27.0.2",
4747
"@types/mock-fs": "^4.13.1",
48-
"@types/node": "^16.11.11",
48+
"@types/node": "^12.20.39",
4949
"@types/proper-lockfile": "^4.1.2",
5050
"@typescript-eslint/eslint-plugin": "^4.33.0",
5151
"@typescript-eslint/parser": "^4.33.0",
@@ -82,7 +82,8 @@
8282
"lint": "npm run lint:ts",
8383
"commit": "git-cz",
8484
"release": "release-script",
85-
"prepare": "husky install"
85+
"prepare": "husky install",
86+
"perf": "ts-node test/perf.ts"
8687
},
8788
"config": {
8889
"commitizen": {

src/lib/db.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -767,10 +767,11 @@ describe("lib/db", () => {
767767

768768
it("does not do anything while the DB is being closed", async () => {
769769
db.set("key3", 3);
770+
await db.compress(); // this writes the DB
770771
db.delete("key2");
771772
db.set("key3", 3.5);
772-
const closePromise = db.close();
773-
await db.compress();
773+
const closePromise = db.close(); // this only appends the extra key3 line
774+
await db.compress(); // this does not compress
774775
await closePromise;
775776

776777
await expect(fs.readFile(testFilenameFull, "utf8")).resolves.toBe(

src/lib/db.ts

Lines changed: 110 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,29 @@ export interface FsWriteOptions {
8787
EOL?: string;
8888
}
8989

90+
enum Operation {
91+
Clear = 0,
92+
Write = 1,
93+
Delete = 2,
94+
}
95+
96+
type LazyEntry<V extends unknown = unknown> = (
97+
| {
98+
op: Operation.Clear;
99+
}
100+
| {
101+
op: Operation.Delete;
102+
key: string;
103+
}
104+
| {
105+
op: Operation.Write;
106+
key: string;
107+
value: V;
108+
}
109+
) & {
110+
serialize(): string;
111+
};
112+
90113
/**
91114
* fsync on a directory ensures there are no rename operations etc. which haven't been persisted to disk.
92115
*/
@@ -443,7 +466,7 @@ export class JsonlDB<V extends unknown = unknown> {
443466
throw new Error("The database is not open!");
444467
}
445468
this._db.clear();
446-
this.write("");
469+
this.write(this.makeLazyClear());
447470
}
448471
public delete(key: string): boolean {
449472
if (!this._isOpen) {
@@ -452,7 +475,7 @@ export class JsonlDB<V extends unknown = unknown> {
452475
const ret = this._db.delete(key);
453476
if (ret) {
454477
// Something was deleted
455-
this.write(this.entryToLine(key));
478+
this.write(this.makeLazyDelete(key));
456479
}
457480
return ret;
458481
}
@@ -461,7 +484,7 @@ export class JsonlDB<V extends unknown = unknown> {
461484
throw new Error("The database is not open!");
462485
}
463486
this._db.set(key, value);
464-
this.write(this.entryToLine(key, value));
487+
this.write(this.makeLazyWrite(key, value));
465488
return this;
466489
}
467490

@@ -488,7 +511,7 @@ export class JsonlDB<V extends unknown = unknown> {
488511

489512
for (const [key, value] of Object.entries(jsonOrFile)) {
490513
this._db.set(key, value);
491-
this.write(this.entryToLine(key, value), true);
514+
this.write(this.makeLazyWrite(key, value), true);
492515
}
493516
}
494517

@@ -502,8 +525,8 @@ export class JsonlDB<V extends unknown = unknown> {
502525
return fs.writeJSON(filename, composeObject([...this._db]), options);
503526
}
504527

505-
private updateStatistics(command: string): void {
506-
if (command === "") {
528+
private updateStatistics(entry: LazyEntry<V>): void {
529+
if (entry.op === Operation.Clear) {
507530
this._uncompressedSize = 0;
508531
} else {
509532
this._uncompressedSize++;
@@ -570,20 +593,20 @@ export class JsonlDB<V extends unknown = unknown> {
570593
* Writes a line into the correct backlog
571594
* @param noAutoCompress Whether auto-compression should be disabled
572595
*/
573-
private write(line: string, noAutoCompress: boolean = false): void {
596+
private write(lazy: LazyEntry<V>, noAutoCompress: boolean = false): void {
574597
/* istanbul ignore else */
575598
if (this._compressBacklog && !this._compressBacklog.destroyed) {
576599
// The compress backlog handling also handles the file statistics
577-
this._compressBacklog.write(line);
600+
this._compressBacklog.write(lazy);
578601
} else if (this._writeBacklog && !this._writeBacklog.destroyed) {
579602
// Update line statistics
580-
this.updateStatistics(line);
603+
this.updateStatistics(lazy);
581604

582605
// Either compress or write to the main file, never both
583606
if (!noAutoCompress && this.needToCompress()) {
584607
this.compress();
585608
} else {
586-
this._writeBacklog.write(line);
609+
this._writeBacklog.write(lazy);
587610
// If this is a throttled stream, uncork it as soon as the write
588611
// buffer is larger than configured
589612
if (
@@ -601,7 +624,7 @@ export class JsonlDB<V extends unknown = unknown> {
601624
}
602625
// If necessary, write to the dump backlog, so the dump doesn't miss any data
603626
if (this._dumpBacklog && !this._dumpBacklog.destroyed) {
604-
this._dumpBacklog.write(line);
627+
this._dumpBacklog.write(lazy);
605628
}
606629
}
607630

@@ -616,6 +639,48 @@ export class JsonlDB<V extends unknown = unknown> {
616639
}
617640
}
618641

642+
private makeLazyClear(): LazyEntry & { op: Operation.Clear } {
643+
return {
644+
op: Operation.Clear,
645+
646+
serialize:
647+
/* istanbul ignore next - this is impossible to test since it requires exact timing */ () =>
648+
"",
649+
};
650+
}
651+
652+
private makeLazyDelete(key: string): LazyEntry & { op: Operation.Delete } {
653+
let serialized: string | undefined;
654+
return {
655+
op: Operation.Delete,
656+
key,
657+
serialize: () => {
658+
if (serialized == undefined) {
659+
serialized = this.entryToLine(key);
660+
}
661+
return serialized;
662+
},
663+
};
664+
}
665+
666+
private makeLazyWrite(
667+
key: string,
668+
value: V,
669+
): LazyEntry<V> & { op: Operation.Write } {
670+
let serialized: string | undefined;
671+
return {
672+
op: Operation.Write,
673+
key,
674+
value,
675+
serialize: () => {
676+
if (serialized == undefined) {
677+
serialized = this.entryToLine(key, value);
678+
}
679+
return serialized;
680+
},
681+
};
682+
}
683+
619684
/**
620685
* Saves a compressed copy of the DB into the given path.
621686
* @param targetFilename Where the compressed copy should be written. Default: `<filename>.dump`
@@ -635,13 +700,14 @@ export class JsonlDB<V extends unknown = unknown> {
635700
for (const [key, value] of entries) {
636701
await fs.appendFile(
637702
this._dumpFd,
703+
// No need to serialize lazily here
638704
this.entryToLine(key, value) + "\n",
639705
);
640706
}
641707
// In case there is any data in the backlog stream, persist that too
642-
let line: string;
643-
while (null !== (line = this._dumpBacklog.read())) {
644-
await fs.appendFile(this._dumpFd, line + "\n");
708+
let lazy: LazyEntry<V>;
709+
while (null !== (lazy = this._dumpBacklog.read())) {
710+
await fs.appendFile(this._dumpFd, lazy.serialize() + "\n");
645711
}
646712
this._dumpBacklog.destroy();
647713
this._dumpBacklog = undefined;
@@ -665,16 +731,35 @@ export class JsonlDB<V extends unknown = unknown> {
665731
// Open the file for appending and reading
666732
this._fd = await fs.open(this.filename, "a+");
667733
this._openPromise?.resolve();
734+
// The chunk map is used to buffer all entries that are currently waiting in line
735+
// so we avoid serializing redundant entries. When the write backlog is throttled,
736+
// the chunk map will only be used for a short time.
737+
const chunk = new Map<string, LazyEntry>();
668738
for await (const action of this
669-
._writeBacklog as AsyncIterable<string>) {
670-
if (action === "") {
671-
// Since we opened the file in append mode, we cannot truncate
672-
// therefore close and open in write mode again
673-
await fs.close(this._fd);
674-
this._fd = await fs.open(this.filename, "w+");
739+
._writeBacklog as AsyncIterable<LazyEntry>) {
740+
if (action.op === Operation.Clear) {
741+
chunk.clear();
742+
chunk.set("", action);
675743
} else {
676-
await fs.appendFile(this._fd, action + "\n");
744+
// Only remember the last entry for each key
745+
chunk.set(action.key, action);
677746
}
747+
748+
// When the backlog has been drained, perform the necessary write actions
749+
if (this._writeBacklog.readableLength === 0) {
750+
for (const entry of chunk.values()) {
751+
if (entry.op === Operation.Clear) {
752+
// Since we opened the file in append mode, we cannot truncate
753+
// therefore close and open in write mode again
754+
await fs.close(this._fd);
755+
this._fd = await fs.open(this.filename, "w+");
756+
} else {
757+
await fs.appendFile(this._fd, entry.serialize() + "\n");
758+
}
759+
}
760+
chunk.clear();
761+
}
762+
678763
// When this is a throttled stream, auto-cork it when it was drained
679764
if (this._writeBacklog.readableLength === 0 && this._isOpen) {
680765
this.autoCork();
@@ -737,10 +822,10 @@ export class JsonlDB<V extends unknown = unknown> {
737822
}
738823

739824
// In case there is any data in the backlog stream, persist that too
740-
let line: string;
741-
while (null !== (line = this._compressBacklog.read())) {
742-
this.updateStatistics(line);
743-
this._writeBacklog!.write(line);
825+
let lazy: LazyEntry<V>;
826+
while (null !== (lazy = this._compressBacklog.read())) {
827+
this.updateStatistics(lazy);
828+
this._writeBacklog!.write(lazy);
744829
}
745830
this._compressBacklog.destroy();
746831
this._compressBacklog = undefined;

test/perf.ts

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,72 @@
1+
import { padStart } from "alcalzone-shared/strings";
2+
import fs from "fs-extra";
13
import { JsonlDB } from "../src";
24

5+
process.on("unhandledRejection", (r) => {
6+
debugger;
7+
});
8+
39
const testDB: JsonlDB<any> = new JsonlDB("test.jsonl", {
410
autoCompress: { onClose: false },
511
throttleFS: {
6-
intervalMs: 10000,
12+
intervalMs: 1000,
713
},
814
});
915

1016
(async () => {
11-
// await testDB.open();
12-
// // add a shitton of values
13-
// console.time("create values");
14-
// const MAX_NODES = 100;
15-
// for (let pass = 1; pass <= 10; pass++) {
16-
// for (let nodeId = 1; nodeId <= MAX_NODES; nodeId++) {
17-
// for (let ccId = 1; ccId <= 100; ccId++) {
18-
// for (let endpoint = 0; endpoint <= 10; endpoint++) {
19-
// for (const property of ["a", "b", "c", "d", "e"]) {
20-
// const key = `${nodeId}-${ccId}-${endpoint}-${property}`;
21-
// if (Math.random() < 0.15) {
22-
// testDB.delete(key);
23-
// } else {
24-
// testDB.set(key, Math.random() * 100);
25-
// }
26-
// }
27-
// }
28-
// }
29-
// }
30-
// }
17+
// add a shitton of values
18+
const NUM_PASSES = 10;
19+
const NUM_KEYS = 1000;
20+
const NUM_CHANGES = 100000;
21+
let total: number = 0;
22+
23+
// console.time("open");
24+
// console.timeEnd("open");
25+
26+
for (let pass = 1; pass <= NUM_PASSES; pass++) {
27+
await fs.remove("test.jsonl");
28+
29+
await testDB.open();
30+
31+
console.log(`start ${pass}`);
32+
33+
const start = Date.now();
34+
for (let i = 0; i < NUM_CHANGES; i++) {
35+
const key = `k${padStart(
36+
Math.round(Math.random() * NUM_KEYS).toString(),
37+
5,
38+
"0",
39+
)}`;
40+
if (Math.random() < 0.15) {
41+
testDB.delete(key);
42+
} else {
43+
testDB.set(key, Math.random() * 100);
44+
}
45+
}
46+
console.log("close");
47+
await testDB.close();
48+
49+
const time = Date.now() - start;
50+
total += time;
51+
52+
console.log(`end ${pass}`);
53+
}
54+
// console.time("close");
3155
// await testDB.close();
32-
// console.timeEnd("create values");
56+
// console.timeEnd("close");
57+
58+
console.log(`${NUM_PASSES}x, ${NUM_KEYS} keys, ${NUM_CHANGES} changes`);
59+
console.log(` ${(total / NUM_PASSES).toFixed(2)} ms / attempt`);
60+
console.log(` ${((NUM_CHANGES / total) * 1000).toFixed(2)} changes/s`);
3361

34-
console.time("open values");
35-
await testDB.open();
36-
console.log(testDB.size);
37-
console.timeEnd("open values");
62+
process.exit(0);
3863

39-
await testDB.close();
64+
// console.time("open values");
65+
// await testDB.open();
66+
// console.log(testDB.size);
67+
// console.timeEnd("open values");
68+
69+
// await testDB.close();
4070

4171
// await fs.remove("test.jsonl");
4272
})().catch(() => {

0 commit comments

Comments
 (0)