Skip to content

Commit 066dbee

Browse files
authored
feat: add fk constraints to schema (#768)
- feat: load foreign key constraints in schema on startup - feat: add `load_schema` general setting: on/off/auto
1 parent 2a02d04 commit 066dbee

File tree

11 files changed

+328
-19
lines changed

11 files changed

+328
-19
lines changed

integration/go/go_pgx/sharded_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,11 @@ func TestShardedTwoPc(t *testing.T) {
152152
assert.NoError(t, err)
153153
}
154154

155-
// +3 is for schema sync
155+
// +4 is for schema sync
156156
assertShowField(t, "SHOW STATS", "total_xact_2pc_count", 200, "pgdog_2pc", "pgdog_sharded", 0, "primary")
157157
assertShowField(t, "SHOW STATS", "total_xact_2pc_count", 200, "pgdog_2pc", "pgdog_sharded", 1, "primary")
158-
assertShowField(t, "SHOW STATS", "total_xact_count", 401+3, "pgdog_2pc", "pgdog_sharded", 0, "primary") // PREPARE, COMMIT for each transaction + TRUNCATE
159-
assertShowField(t, "SHOW STATS", "total_xact_count", 401+3, "pgdog_2pc", "pgdog_sharded", 1, "primary")
158+
assertShowField(t, "SHOW STATS", "total_xact_count", 401+4, "pgdog_2pc", "pgdog_sharded", 0, "primary") // PREPARE, COMMIT for each transaction + TRUNCATE
159+
assertShowField(t, "SHOW STATS", "total_xact_count", 401+4, "pgdog_2pc", "pgdog_sharded", 1, "primary")
160160

161161
for i := range 200 {
162162
rows, err := conn.Query(

pgdog-config/src/general.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::path::PathBuf;
55
use std::time::Duration;
66

77
use crate::pooling::ConnectionRecovery;
8-
use crate::{CopyFormat, QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior};
8+
use crate::{CopyFormat, LoadSchema, QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior};
99

1010
use super::auth::{AuthType, PassthoughAuth};
1111
use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy};
@@ -209,6 +209,9 @@ pub struct General {
209209
/// Trigger a schema reload on DDL like CREATE TABLE.
210210
#[serde(default = "General::reload_schema_on_ddl")]
211211
pub reload_schema_on_ddl: bool,
212+
/// Load database schema.
213+
#[serde(default = "General::load_schema")]
214+
pub load_schema: LoadSchema,
212215
}
213216

214217
impl Default for General {
@@ -282,6 +285,7 @@ impl Default for General {
282285
omnisharded_sticky: bool::default(),
283286
resharding_copy_format: CopyFormat::default(),
284287
reload_schema_on_ddl: Self::reload_schema_on_ddl(),
288+
load_schema: Self::load_schema(),
285289
}
286290
}
287291
}
@@ -566,6 +570,10 @@ impl General {
566570
)
567571
}
568572

573+
fn load_schema() -> LoadSchema {
574+
Self::env_enum_or_default("PGDOG_LOAD_SCHEMA")
575+
}
576+
569577
pub fn mirror_queue() -> usize {
570578
Self::env_or_default("PGDOG_MIRROR_QUEUE", 128)
571579
}

pgdog-config/src/sharding.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,27 @@ impl Display for CopyFormat {
370370
}
371371
}
372372

373+
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
374+
#[serde(rename_all = "snake_case", deny_unknown_fields)]
375+
pub enum LoadSchema {
376+
On,
377+
Off,
378+
#[default]
379+
Auto,
380+
}
381+
382+
impl FromStr for LoadSchema {
383+
type Err = ();
384+
fn from_str(s: &str) -> Result<Self, Self::Err> {
385+
Ok(match s.to_lowercase().as_str() {
386+
"on" => Self::On,
387+
"auto" => Self::Auto,
388+
"off" => Self::Off,
389+
_ => return Err(()),
390+
})
391+
}
392+
}
393+
373394
#[cfg(test)]
374395
mod test {
375396
use super::*;

pgdog-stats/src/schema.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,40 @@ use std::{collections::HashMap, hash::Hash, ops::Deref, sync::Arc};
55
/// Schema name -> Table name -> Relation
66
pub type Relations = HashMap<String, HashMap<String, Relation>>;
77

8+
/// The action to take when a referenced row is deleted or updated.
9+
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash)]
10+
pub enum ForeignKeyAction {
11+
#[default]
12+
NoAction,
13+
Restrict,
14+
Cascade,
15+
SetNull,
16+
SetDefault,
17+
}
18+
19+
impl ForeignKeyAction {
20+
/// Parse from PostgreSQL's information_schema representation.
21+
pub fn from_pg_string(s: &str) -> Self {
22+
match s {
23+
"CASCADE" => Self::Cascade,
24+
"SET NULL" => Self::SetNull,
25+
"SET DEFAULT" => Self::SetDefault,
26+
"RESTRICT" => Self::Restrict,
27+
"NO ACTION" | _ => Self::NoAction,
28+
}
29+
}
30+
}
31+
32+
/// A foreign key reference to another table's column.
33+
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Hash)]
34+
pub struct ForeignKey {
35+
pub schema: String,
36+
pub table: String,
37+
pub column: String,
38+
pub on_delete: ForeignKeyAction,
39+
pub on_update: ForeignKeyAction,
40+
}
41+
842
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Hash)]
943
pub struct Column {
1044
pub table_catalog: String,
@@ -16,6 +50,7 @@ pub struct Column {
1650
pub data_type: String,
1751
pub ordinal_position: i32,
1852
pub is_primary_key: bool,
53+
pub foreign_keys: Vec<ForeignKey>,
1954
}
2055

2156
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]

pgdog/src/backend/pool/cluster.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
//! A collection of replicas and a primary.
22
33
use parking_lot::Mutex;
4-
use pgdog_config::{PreparedStatements, QueryParserEngine, QueryParserLevel, Rewrite, RewriteMode};
4+
use pgdog_config::{
5+
LoadSchema, PreparedStatements, QueryParserEngine, QueryParserLevel, Rewrite, RewriteMode,
6+
};
57
use std::{
68
sync::{
79
atomic::{AtomicBool, AtomicUsize, Ordering},
@@ -75,6 +77,7 @@ pub struct Cluster {
7577
client_connection_recovery: ConnectionRecovery,
7678
query_parser_engine: QueryParserEngine,
7779
reload_schema_on_ddl: bool,
80+
load_schema: LoadSchema,
7881
}
7982

8083
/// Sharding configuration from the cluster.
@@ -149,6 +152,7 @@ pub struct ClusterConfig<'a> {
149152
pub client_connection_recovery: ConnectionRecovery,
150153
pub lsn_check_interval: Duration,
151154
pub reload_schema_on_ddl: bool,
155+
pub load_schema: LoadSchema,
152156
}
153157

154158
impl<'a> ClusterConfig<'a> {
@@ -198,6 +202,7 @@ impl<'a> ClusterConfig<'a> {
198202
client_connection_recovery: general.client_connection_recovery,
199203
lsn_check_interval: Duration::from_millis(general.lsn_check_interval),
200204
reload_schema_on_ddl: general.reload_schema_on_ddl,
205+
load_schema: general.load_schema,
201206
}
202207
}
203208
}
@@ -233,6 +238,7 @@ impl Cluster {
233238
lsn_check_interval,
234239
query_parser_engine,
235240
reload_schema_on_ddl,
241+
load_schema,
236242
} = config;
237243

238244
let identifier = Arc::new(DatabaseUser {
@@ -280,6 +286,7 @@ impl Cluster {
280286
client_connection_recovery,
281287
query_parser_engine,
282288
reload_schema_on_ddl,
289+
load_schema,
283290
}
284291
}
285292

@@ -486,10 +493,11 @@ impl Cluster {
486493
}
487494

488495
fn load_schema(&self) -> bool {
489-
self.shards.len() > 1
490-
&& self.sharded_schemas.is_empty()
491-
&& !self.sharded_tables.tables().is_empty()
492-
|| self.multi_tenant().is_some()
496+
match self.load_schema {
497+
LoadSchema::On => true,
498+
LoadSchema::Off => false,
499+
LoadSchema::Auto => self.shards.len() > 1 || self.multi_tenant().is_some(),
500+
}
493501
}
494502

495503
/// Get currently loaded schema from shard 0.
@@ -549,12 +557,11 @@ impl Cluster {
549557

550558
info!("loaded schema from {}/{} shards", done + 1, shards);
551559

560+
schema_changed_hook(&shard.schema(), &identifier, &shard);
561+
552562
// Loaded schema on all shards.
553563
if done >= shards - 1 {
554564
readiness.schemas_ready.notify_waiters();
555-
// We assume the schema is the same on all shards.
556-
// TODO: check that this is the case and raise a stink if its not.
557-
schema_changed_hook(&shard.schema(), &identifier);
558565
}
559566
});
560567
}
@@ -745,7 +752,8 @@ mod test {
745752
let config = ConfigAndUsers::default();
746753
let cluster = Cluster::new_test(&config);
747754

748-
assert!(!cluster.load_schema());
755+
// In Auto mode with multiple shards, load_schema returns true
756+
assert!(cluster.load_schema());
749757
}
750758

751759
#[test]
@@ -755,7 +763,9 @@ mod test {
755763
cluster.sharded_schemas = ShardedSchemas::default();
756764
cluster.sharded_tables = ShardedTables::default();
757765

758-
assert!(!cluster.load_schema());
766+
// In Auto mode with multiple shards, load_schema returns true
767+
// (sharded_schemas and sharded_tables no longer affect this decision)
768+
assert!(cluster.load_schema());
759769
}
760770

761771
#[test]
@@ -839,9 +849,9 @@ mod test {
839849
#[tokio::test]
840850
async fn test_wait_schema_loaded_returns_immediately_when_not_needed() {
841851
let config = ConfigAndUsers::default();
842-
let cluster = Cluster::new_test(&config);
852+
let cluster = Cluster::new_test_single_shard(&config);
843853

844-
// load_schema() returns false because sharded_schemas is not empty
854+
// load_schema() returns false for single shard without multi_tenant
845855
assert!(!cluster.load_schema());
846856

847857
// Should return immediately without waiting

pgdog/src/backend/pool/ee/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
use crate::backend::{databases::User, Schema};
1+
use crate::backend::{databases::User, Schema, Shard};
22

3-
pub(crate) fn schema_changed_hook(_schema: &Schema, _user: &User) {}
3+
pub(crate) fn schema_changed_hook(_schema: &Schema, _user: &User, _shard: &Shard) {}

0 commit comments

Comments
 (0)