Skip to content

Commit a597083

Browse files
authored
Default to omni (#725)
### Omnisharded tables We now treat all tables as omnisharded _unless_ they are configured in `pgdog.toml`. This makes configuration a lot easier. To make this work we load the Postgres schema from system catalogs on boot and inspect it for tables that match the config for each query. This is quick, since the tables (and their columns) are organized in a HashMap. ### Schema inference We now load schema from Postgres on boot and inspect it for sharding keys. To make this work, Postgres needs to be available when PgDog is started. #### Migrations When adding/changing sharded tables, make sure to run `RELOAD` (or `RECONNECT`) to reload the schema, otherwise a `SELECT` against the new table will default to omni, not sharded, for queries that don't provide the sharding key. ### Unique ID generation Added `pgdog.unique_id()` to Postgres itself (via `setup` CLI command), so it's now possible to add it as a default to a table, e.g.: ```sql CREATE TABLE omni ( id BIGINT NOT NULL DEFAULT pgdog.unique_id(), value TEXT, ); ``` How this works: 1. Configure shards in `pgdog.toml` 2. Run `pgdog setup` to create the function and its dependencies. This will create a schema called `pgdog` on all shards and the pl/PgSQL functions will be added to it. **Warning** Don't use the Postgres unique ID function to generate sharding keys! They won't match the shard they are generated on. Make sure to call it explicitly in the query so PgDog executes it instead.
1 parent 5fccd65 commit a597083

File tree

30 files changed

+1040
-205
lines changed

30 files changed

+1040
-205
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration/go/go_pgx/pg_tests_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func executeTimeoutTest(t *testing.T) {
222222
c := make(chan int, 1)
223223

224224
go func() {
225-
err := pgSleepOneSecond(conn, ctx)
225+
err := pgSleepTwoSecond(conn, ctx)
226226
assert.NotNil(t, err)
227227

228228
defer conn.Close(context.Background())
@@ -240,8 +240,8 @@ func executeTimeoutTest(t *testing.T) {
240240
}
241241

242242
// Sleep for 1 second.
243-
func pgSleepOneSecond(conn *pgx.Conn, ctx context.Context) (err error) {
244-
_, err = conn.Exec(ctx, "SELECT pg_sleep(1)")
243+
func pgSleepTwoSecond(conn *pgx.Conn, ctx context.Context) (err error) {
244+
_, err = conn.Exec(ctx, "SELECT pg_sleep(2)")
245245
return err
246246
}
247247

integration/go/go_pgx/sharded_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,11 @@ func TestShardedTwoPc(t *testing.T) {
151151
assert.NoError(t, err)
152152
}
153153

154+
// +3 is for schema sync
154155
assertShowField(t, "SHOW STATS", "total_xact_2pc_count", 200, "pgdog_2pc", "pgdog_sharded", 0, "primary")
155156
assertShowField(t, "SHOW STATS", "total_xact_2pc_count", 200, "pgdog_2pc", "pgdog_sharded", 1, "primary")
156-
assertShowField(t, "SHOW STATS", "total_xact_count", 401, "pgdog_2pc", "pgdog_sharded", 0, "primary") // PREPARE, COMMIT for each transaction + TRUNCATE
157-
assertShowField(t, "SHOW STATS", "total_xact_count", 401, "pgdog_2pc", "pgdog_sharded", 1, "primary")
157+
assertShowField(t, "SHOW STATS", "total_xact_count", 401+3, "pgdog_2pc", "pgdog_sharded", 0, "primary") // PREPARE, COMMIT for each transaction + TRUNCATE
158+
assertShowField(t, "SHOW STATS", "total_xact_count", 401+3, "pgdog_2pc", "pgdog_sharded", 1, "primary")
158159

159160
for i := range 200 {
160161
rows, err := conn.Query(

integration/rust/tests/integration/avg.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use rust::setup::connections_sqlx;
1+
use rust::setup::{admin_sqlx, connections_sqlx};
22
use sqlx::{Connection, Executor, PgConnection, Row};
33

44
#[tokio::test]
@@ -17,12 +17,15 @@ async fn avg_merges_with_helper_count() -> Result<(), Box<dyn std::error::Error>
1717

1818
for shard in [0, 1] {
1919
let comment = format!(
20-
"/* pgdog_shard: {} */ CREATE TABLE avg_reduce_test(price DOUBLE PRECISION)",
20+
"/* pgdog_shard: {} */ CREATE TABLE avg_reduce_test(price DOUBLE PRECISION, customer_id BIGINT)",
2121
shard
2222
);
2323
sharded.execute(comment.as_str()).await?;
2424
}
2525

26+
// Make sure sharded table is loaded in schema.
27+
admin_sqlx().await.execute("RELOAD").await?;
28+
2629
// Insert data on each shard so the query spans multiple shards.
2730
sharded
2831
.execute("/* pgdog_shard: 0 */ INSERT INTO avg_reduce_test(price) VALUES (10.0), (14.0)")
@@ -73,12 +76,14 @@ async fn avg_without_helper_should_still_merge() -> Result<(), Box<dyn std::erro
7376

7477
for shard in [0, 1] {
7578
let comment = format!(
76-
"/* pgdog_shard: {} */ CREATE TABLE avg_rewrite_expectation(price DOUBLE PRECISION)",
79+
"/* pgdog_shard: {} */ CREATE TABLE avg_rewrite_expectation(price DOUBLE PRECISION, customer_id BIGINT)",
7780
shard
7881
);
7982
sharded.execute(comment.as_str()).await?;
8083
}
8184

85+
admin_sqlx().await.execute("RELOAD").await?;
86+
8287
sharded
8388
.execute(
8489
"/* pgdog_shard: 0 */ INSERT INTO avg_rewrite_expectation(price) VALUES (10.0), (14.0)",
@@ -145,12 +150,14 @@ async fn avg_multiple_columns_should_merge() -> Result<(), Box<dyn std::error::E
145150

146151
for shard in [0, 1] {
147152
let comment = format!(
148-
"/* pgdog_shard: {} */ CREATE TABLE avg_multi_column(price DOUBLE PRECISION, discount DOUBLE PRECISION)",
153+
"/* pgdog_shard: {} */ CREATE TABLE avg_multi_column(price DOUBLE PRECISION, discount DOUBLE PRECISION, customer_id BIGINT)",
149154
shard
150155
);
151156
sharded.execute(comment.as_str()).await?;
152157
}
153158

159+
admin_sqlx().await.execute("RELOAD").await?;
160+
154161
sharded
155162
.execute(
156163
"/* pgdog_shard: 0 */ INSERT INTO avg_multi_column(price, discount) VALUES (10.0, 1.0), (14.0, 3.0)",
@@ -231,12 +238,14 @@ async fn avg_prepared_statement_should_merge() -> Result<(), Box<dyn std::error:
231238

232239
for shard in [0, 1] {
233240
let comment = format!(
234-
"/* pgdog_shard: {} */ CREATE TABLE avg_prepared_params(price DOUBLE PRECISION)",
241+
"/* pgdog_shard: {} */ CREATE TABLE avg_prepared_params(price DOUBLE PRECISION, customer_id BIGINT)",
235242
shard
236243
);
237244
sharded.execute(comment.as_str()).await?;
238245
}
239246

247+
admin_sqlx().await.execute("RELOAD").await?;
248+
240249
sharded
241250
.execute(
242251
"/* pgdog_shard: 0 */ INSERT INTO avg_prepared_params(price) VALUES (10.0), (14.0)",

integration/rust/tests/integration/explain.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use rust::setup::connections_tokio;
1+
use rust::setup::{admin_sqlx, connections_tokio};
2+
use sqlx::Executor;
23
use tokio_postgres::SimpleQueryMessage;
34

45
#[tokio::test]
@@ -16,12 +17,14 @@ async fn explain_routing_annotations_surface() -> Result<(), Box<dyn std::error:
1617

1718
for shard in [0, 1] {
1819
let create = format!(
19-
"/* pgdog_shard: {} */ CREATE TABLE explain_avg_test(price DOUBLE PRECISION)",
20+
"/* pgdog_shard: {} */ CREATE TABLE explain_avg_test(price DOUBLE PRECISION, customer_id BIGINT)",
2021
shard
2122
);
2223
sharded.simple_query(create.as_str()).await?;
2324
}
2425

26+
admin_sqlx().await.execute("RELOAD").await?;
27+
2528
sharded
2629
.simple_query(
2730
"/* pgdog_shard: 0 */ INSERT INTO explain_avg_test(price) VALUES (10.0), (14.0)",

integration/rust/tests/integration/per_stmt_routing.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use rust::setup::connections_sqlx;
1+
use rust::setup::{admin_sqlx, connections_sqlx};
22
use sqlx::{Acquire, Executor, Row};
33

44
#[tokio::test]
@@ -12,6 +12,8 @@ async fn per_stmt_routing() -> Result<(), Box<dyn std::error::Error>> {
1212
)
1313
.await?;
1414

15+
admin_sqlx().await.execute("RELOAD").await?;
16+
1517
sharded.execute("TRUNCATE TABLE per_stmt_routing").await?;
1618

1719
for i in 0..50 {

integration/rust/tests/integration/shard_consistency.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use rust::setup::connections_sqlx;
1+
use rust::setup::{admin_sqlx, connections_sqlx};
22
use sqlx::{Executor, Row};
33

44
#[tokio::test]
@@ -15,14 +15,16 @@ async fn shard_consistency_validator() -> Result<(), Box<dyn std::error::Error>>
1515
// Create different table schemas on each shard to trigger validator errors
1616
// Shard 0: table with 2 columns (id, name)
1717
sharded
18-
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100))")
18+
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100), customer_id BIGINT)")
1919
.await?;
2020

2121
// Shard 1: table with 3 columns (id, name, extra) - different column count
2222
sharded
23-
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT)")
23+
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT, customer_id BIGINT)")
2424
.await?;
2525

26+
admin_sqlx().await.execute("RELOAD").await?;
27+
2628
// Insert some test data on each shard
2729
sharded
2830
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_test (id, name) VALUES (1, 'shard0_row1'), (2, 'shard0_row2')")
@@ -76,14 +78,16 @@ async fn shard_consistency_validator_column_names() -> Result<(), Box<dyn std::e
7678
// Create tables with same column count but different column names
7779
// Shard 0: columns named (id, name)
7880
sharded
79-
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, name VARCHAR(100))")
81+
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, name VARCHAR(100), customer_id BIGINT)")
8082
.await?;
8183

8284
// Shard 1: columns named (id, username) - different column name
8385
sharded
84-
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, username VARCHAR(100))")
86+
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, username VARCHAR(100), customer_id BIGINT)")
8587
.await?;
8688

89+
admin_sqlx().await.execute("RELOAD").await?;
90+
8791
// Insert test data
8892
sharded
8993
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_name_test (id, name) VALUES (1, 'test1')")
@@ -138,9 +142,11 @@ async fn shard_consistency_validator_success() -> Result<(), Box<dyn std::error:
138142

139143
// Create identical table schemas on both shards
140144
sharded
141-
.execute("CREATE TABLE shard_consistent_test (id BIGINT PRIMARY KEY, name VARCHAR(100))")
145+
.execute("CREATE TABLE shard_consistent_test (id BIGINT PRIMARY KEY, name VARCHAR(100), customer_id BIGINT)")
142146
.await?;
143147

148+
admin_sqlx().await.execute("RELOAD").await?;
149+
144150
// Insert test data
145151
sharded
146152
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_consistent_test (id, name) VALUES (1, 'shard0_data'), (2, 'shard0_more')")
@@ -190,9 +196,11 @@ async fn shard_consistency_data_row_validator_prepared_statement()
190196
// Create tables with same schema but we'll query them differently to trigger DataRow validation
191197
// Both tables have same structure so RowDescription will match initially
192198
sharded
193-
.execute("CREATE TABLE shard_datarow_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT DEFAULT 'default')")
199+
.execute("CREATE TABLE shard_datarow_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT DEFAULT 'default', customer_id BIGINT)")
194200
.await?;
195201

202+
admin_sqlx().await.execute("RELOAD").await?;
203+
196204
// Insert test data
197205
sharded
198206
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_datarow_test (id, name) VALUES (1, 'test1'), (2, 'test2')")
@@ -208,13 +216,15 @@ async fn shard_consistency_data_row_validator_prepared_statement()
208216

209217
// Actually, let's create a simpler test - use views with different column counts
210218
sharded
211-
.execute("/* pgdog_shard: 0 */ CREATE VIEW datarow_view AS SELECT id, name FROM shard_datarow_test")
219+
.execute("/* pgdog_shard: 0 */ CREATE VIEW datarow_view AS SELECT id, name, customer_id FROM shard_datarow_test")
212220
.await?;
213221

214222
sharded
215-
.execute("/* pgdog_shard: 1 */ CREATE VIEW datarow_view AS SELECT id, name, extra FROM shard_datarow_test")
223+
.execute("/* pgdog_shard: 1 */ CREATE VIEW datarow_view AS SELECT id, name, extra, customer_id FROM shard_datarow_test")
216224
.await?;
217225

226+
admin_sqlx().await.execute("RELOAD").await?;
227+
218228
// Now prepare a statement against the views
219229
let result = sharded
220230
.prepare("SELECT * FROM datarow_view WHERE id > $1 ORDER BY id")

integration/rust/tests/integration/stddev.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::BTreeSet;
22

33
use ordered_float::OrderedFloat;
4-
use rust::setup::{connections_sqlx, connections_tokio};
4+
use rust::setup::{admin_sqlx, connections_sqlx, connections_tokio};
55
use sqlx::{Connection, Executor, PgConnection, Row, postgres::PgPool};
66

77
const SHARD_URLS: [&str; 2] = [
@@ -55,7 +55,7 @@ async fn stddev_pop_merges_with_helpers() -> Result<(), Box<dyn std::error::Erro
5555
reset_table(
5656
&sharded,
5757
"stddev_pop_reduce_test",
58-
"(price DOUBLE PRECISION)",
58+
"(price DOUBLE PRECISION, customer_id BIGINT)",
5959
)
6060
.await?;
6161

@@ -106,7 +106,7 @@ async fn stddev_samp_aliases_should_merge() -> Result<(), Box<dyn std::error::Er
106106
reset_table(
107107
&sharded,
108108
"stddev_sample_reduce_test",
109-
"(price DOUBLE PRECISION)",
109+
"(price DOUBLE PRECISION, customer_id BIGINT)",
110110
)
111111
.await?;
112112

@@ -157,7 +157,12 @@ async fn variance_variants_should_merge() -> Result<(), Box<dyn std::error::Erro
157157
let conns = connections_sqlx().await;
158158
let sharded = conns.get(1).cloned().unwrap();
159159

160-
reset_table(&sharded, "variance_reduce_test", "(price DOUBLE PRECISION)").await?;
160+
reset_table(
161+
&sharded,
162+
"variance_reduce_test",
163+
"(price DOUBLE PRECISION, customer_id BIGINT)",
164+
)
165+
.await?;
161166

162167
seed_stat_data(
163168
&sharded,
@@ -222,7 +227,7 @@ async fn stddev_multiple_columns_should_merge() -> Result<(), Box<dyn std::error
222227
reset_table(
223228
&sharded,
224229
"stddev_multi_column",
225-
"(price DOUBLE PRECISION, discount DOUBLE PRECISION)",
230+
"(price DOUBLE PRECISION, discount DOUBLE PRECISION, customer_id BIGINT)",
226231
)
227232
.await?;
228233

@@ -290,7 +295,7 @@ async fn stddev_prepared_statement_should_merge() -> Result<(), Box<dyn std::err
290295
reset_table(
291296
&sharded,
292297
"stddev_prepared_params",
293-
"(price DOUBLE PRECISION)",
298+
"(price DOUBLE PRECISION, customer_id BIGINT)",
294299
)
295300
.await?;
296301

@@ -358,7 +363,7 @@ async fn stddev_distinct_should_error_until_supported() -> Result<(), Box<dyn st
358363
reset_table(
359364
&sharded,
360365
"stddev_distinct_error",
361-
"(price DOUBLE PRECISION)",
366+
"(price DOUBLE PRECISION, customer_id BIGINT)",
362367
)
363368
.await?;
364369

@@ -396,7 +401,12 @@ async fn stddev_distinct_future_expectation() -> Result<(), Box<dyn std::error::
396401
let conns = connections_sqlx().await;
397402
let sharded = conns.get(1).cloned().unwrap();
398403

399-
reset_table(&sharded, "stddev_distinct_test", "(price DOUBLE PRECISION)").await?;
404+
reset_table(
405+
&sharded,
406+
"stddev_distinct_test",
407+
"(price DOUBLE PRECISION, customer_id BIGINT)",
408+
)
409+
.await?;
400410

401411
seed_stat_data(
402412
&sharded,
@@ -456,6 +466,8 @@ async fn reset_table(pool: &PgPool, table: &str, schema: &str) -> Result<(), sql
456466
pool.execute(create_stmt.as_str()).await?;
457467
}
458468

469+
admin_sqlx().await.execute("RELOAD").await?;
470+
459471
Ok(())
460472
}
461473

0 commit comments

Comments
 (0)