fix(account_set): activate eventually_consistent flag for async balance rollup#697
fix(account_set): activate eventually_consistent flag for async balance rollup#697bodymindarts wants to merge 36 commits intomainfrom
Conversation
📊 Performance ReportCommit: 0d7e524 Cala Performance Benchmark Results (non-representative)Criterion Benchmark Results (single-threaded)
Load Testing Results (parallel-execution)
Note: Performance results may vary based on system resources and database state. Last updated by commit 0d7e524 |
f4b722e to
622a84a
Compare
…ce rollup Fix two bugs in NewAccountBuilder that prevented the eventually_consistent flag from being persisted: - eventually_consistent() method was setting is_account_set instead of eventually_consistent - into_values() hardcoded eventually_consistent to false instead of using the struct field Add incremental recalculate_balances() to AccountSets service that replays member balance_history deltas into account set balance snapshots. The replay uses SQL LAG window function to compute per-entry deltas from leaf member accounts, applies them to the account set's running balance, and inserts identical balance_history rows to what the inline path would have created. Incremental behavior: each recalculate picks up from where it left off (using sum of latest_versions as offset), making it idempotent and efficient for repeated calls. Add NewAccountSet.eventually_consistent field so account sets can opt in to async balance computation. Include integration test verifying: multi-transaction batches, incremental recalculate across batches, snapshot-level parity with inline path (count, versions, running balances, entry_ids), and idempotency. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
622a84a to
adc0788
Compare
Replace ROW_NUMBER-based offset with entry_id watermark in fetch_incremental_member_history. The ROW_NUMBER approach broke when account set membership changed between recalculations because row numbering shifted. Entry IDs (UUID v7, monotonically ordered) provide a stable cursor regardless of membership changes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
PostgreSQL doesn't support MAX() on UUID type natively. Replace with ORDER BY latest_entry_id DESC LIMIT 1 which uses btree ordering and avoids the unsupported aggregate. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace sqlx::query() with sqlx::query!() in load_account_set_balances and fetch_incremental_member_history for compile-time SQL verification. Regenerate .sqlx offline cache. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When adding or removing a member from an eventually-consistent account set, the balance was never updated because find_for_update filters by eventually_consistent=FALSE. This adds update_balance_for_account_in_op which directly locks and updates the EC account's balance via load_all_for_update, bypassing that filter. The entry_id watermark naturally prevents double-counting during subsequent recalculations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove redundant SQL round trip in load_account_set_balances — the entry_id watermark is already available in the BalanceSnapshot JSON loaded by the first query. Extract it via max_by_key on the loaded balances instead of a separate cala_balance_history query. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ances The watermark is just the max entry_id across the loaded balance snapshots — returning it separately couples the repo method to a concern of its single caller. Move derivation to the call site in recalculate_account_set_balances_in_op instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tness A single global watermark works across all currencies because EntryId uses UUID v7 (monotonically ordered by creation time). Document this invariant on fetch_incremental_member_history and at the derivation site so future maintainers know the coupling. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…g to Uuid Remove unnecessary watermark.map(uuid::Uuid::from) — sqlx accepts Option<EntryId> directly via the `as` cast. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add five unit tests covering: first-run version numbering, incremental delta application on existing balances, multi-delta accumulation, multi-currency independence, and empty-history no-op. Also clarify the version: 0 initializer matches Snapshots::new_snapshot (both start at 0 and increment before persisting). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
UUID v7 entry_id was only monotonic within a single process. In HA deployments with multiple writers, clock drift between pods could cause incremental EC recalculation to skip balance_history rows. Add a BIGSERIAL `seq` column to cala_balance_history that provides globally-ordered sequencing via PostgreSQL sequences. Track the high-water `latest_seq` on cala_current_balances for efficient watermark retrieval during recalculation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…embers Batch method that loads all sets' balances, fetches the union of member histories in one query, and replays deltas in a single pass — dispatching each row to every set that contains that member. This eliminates redundant balance_history scans when recalculating sibling/ancestor sets in a hierarchy. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The single-set `recalculate_balances_in_op` now delegates to `recalculate_balances_batch_in_op(&[id])`, eliminating the duplicate code path. Removed `load_account_set_balances`, `fetch_incremental_member_history`, and `replay_member_deltas` in favour of their batch equivalents. Unit tests ported to exercise `replay_member_deltas_batch` directly, with two new cases: shared-member dispatch and watermark-based row skipping. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…hierarchies Adds recalculate_balances_deep that discovers all descendant account sets via recursive CTE and includes them in a single batch recalculation call. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… test The batch_recalculate_shared_members test had recipient_account as a member of both set_a and set_b. When both sets were added to root, the transitive cascade tried to add recipient_account to root twice, causing MemberAlreadyAdded. Give each child set its own exclusive leaf account instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 3 new Criterion benchmarks for eventually consistent account sets: - #9: EC posting only (no inline recalculate) - #10: EC post + recalculate after every tx (worst case) - #11: EC 10 posts + 1 batch recalculate (amortized) Also adds `eventually_consistent` parameter to the `init_accounts_with_account_sets_depth` helper. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Run `make sqlx-prepare` to regenerate the offline query cache for new queries in account_set/repo.rs and balance/repo.rs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The benchmark was running 10 transactions inside the timed loop, making it appear ~10x slower than it actually is. Change to 1 post + 1 batch recalculate per iteration, matching the structure of benchmark #10 for a direct comparison. The batch recalculate still amortizes across accumulated staleness from prior iterations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace `self.accounts.find(target_account_id)` with `self.accounts.find_all_in_op()` in both `add_member_in_op` and `remove_member_in_op`. The pool-based `find` bypassed the caller's transaction, so it couldn't see an account created in the same uncommitted atomic operation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
recalculate_account_set_balances_batch_in_op only rebuilt cala_current_balances, leaving cala_cumulative_effective_balances stale for EC sets. This adds a self-contained effective recalculation path that mirrors the inline architecture: - EffectiveBalanceRepo: 4 new methods (fetch_member_effective_history, delete_at_or_after, load_latest_before, insert_recalc_snapshots) - EffectiveBalances: replay_effective_deltas pure-computation sweep grouped by effective_date, with per-date version resets - balance/mod.rs: delegates to effective module when journal has effective balances enabled Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove the separate 20260401154215_add_balance_history_seq migration and fold its changes (seq column, latest_seq column, index) directly into the initial ledger_setup migration. No backwards compatibility needed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion When a back-dated transaction is posted between existing effective dates, the watermark-filtered history fetch missed old member history needed to rebuild deleted effective rows at later dates. Fix by first discovering min_effective_date from new rows via watermark, then re-fetching ALL history from that date onward (with LAG on full partition) so later dates are correctly rebuilt. Also adds integration test for EC account set effective balance recalculation covering idempotency and incremental scenarios. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CI uses standard cargo which only sees the crate-level .sqlx/ directory. Moved the missing query cache entry to cala-ledger/.sqlx/ and removed the duplicate root-level .sqlx/ directory entirely. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sets Add `AccountSets::list_eventually_consistent_ids` (plus an `_in_op` variant) which returns the ids of all account sets whose underlying account is marked `eventually_consistent`. This replaces the need for downstream consumers to reach into the cala schema with raw SQL (e.g. `SELECT id FROM cala_accounts WHERE is_account_set = TRUE AND eventually_consistent = TRUE`) when they want to drive periodic reconciliation via `recalculate_balances_deep`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously this API returned an unbounded Vec<AccountSetId>, which could
grow with the number of eventually consistent account sets. Consumers
(e.g. periodic reconciliation jobs that batch-recalculate balances) now
paginate through the list using the macro-generated AccountSetByIdCursor,
consistent with the PaginatedQueryArgs/PaginatedQueryRet shape used by
the other list endpoints on AccountSets.
BREAKING CHANGE: AccountSets::list_eventually_consistent_ids{,_in_op}
now take es_entity::PaginatedQueryArgs<AccountSetByIdCursor> and return
es_entity::PaginatedQueryRet<AccountSetId, AccountSetByIdCursor>.
The previous return type `HashMap<(AccountId, Currency), (BalanceSnapshot, i32)>` required a `#[allow(clippy::type_complexity)]` suppression. Introduce a named `LatestBeforeEntry` struct with explicit `snapshot` and `all_time_version` fields so the type is self-documenting and the suppression can be removed.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Watermark advancement in add_member skips member history
- For EC account sets, add/remove member now preserves the pre-change set watermark and restores
latest_seqafter summary snapshot writes so pending member history cannot be skipped.
- For EC account sets, add/remove member now preserves the pre-change set watermark and restores
Or push these changes by commenting:
@cursor push e5ccc8171a
Preview (e5ccc8171a)
diff --git a/cala-ledger/src/account_set/mod.rs b/cala-ledger/src/account_set/mod.rs
--- a/cala-ledger/src/account_set/mod.rs
+++ b/cala-ledger/src/account_set/mod.rs
@@ -168,6 +168,15 @@
member: impl Into<AccountSetMemberId>,
) -> Result<AccountSet, AccountSetError> {
let member = member.into();
+ let target_account_id = AccountId::from(account_set_id);
+ let target_account = self
+ .accounts
+ .find_all_in_op::<Account>(&mut *op, &[target_account_id])
+ .await?
+ .remove(&target_account_id)
+ .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
+ let is_ec = target_account.values().config.eventually_consistent;
+
let (time, parents, account_set, member_id) = match member {
AccountSetMemberId::Account(id) => {
tracing::Span::current().record("is_account", true);
@@ -207,12 +216,23 @@
}
};
+ let ec_watermark = if is_ec {
+ self.balances
+ .load_balance_watermark_in_op(
+ op,
+ account_set.values().journal_id,
+ target_account_id,
+ )
+ .await?
+ } else {
+ None
+ };
+
let balances = self
.balances
.find_balances_for_update(op, account_set.values().journal_id, member_id)
.await?;
- let target_account_id = AccountId::from(&account_set.id());
let mut entries = Vec::new();
for balance in balances.into_values() {
entries_for_add_balance(&mut entries, target_account_id, balance);
@@ -223,13 +243,6 @@
}
let entries = self.entries.create_all_in_op(op, entries).await?;
- let target_account = self
- .accounts
- .find_all_in_op::<Account>(&mut *op, &[target_account_id])
- .await?
- .remove(&target_account_id)
- .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
- let is_ec = target_account.values().config.eventually_consistent;
let entries_for_ec = if is_ec { Some(entries.clone()) } else { None };
let mappings = std::iter::once((target_account_id, parents)).collect();
@@ -252,6 +265,7 @@
target_account_id,
&ec_entries,
time,
+ ec_watermark,
)
.await?;
}
@@ -281,6 +295,15 @@
member: impl Into<AccountSetMemberId>,
) -> Result<AccountSet, AccountSetError> {
let member = member.into();
+ let target_account_id = AccountId::from(account_set_id);
+ let target_account = self
+ .accounts
+ .find_all_in_op::<Account>(&mut *op, &[target_account_id])
+ .await?
+ .remove(&target_account_id)
+ .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
+ let is_ec = target_account.values().config.eventually_consistent;
+
let (time, parents, account_set, member_id) = match member {
AccountSetMemberId::Account(id) => {
let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
@@ -314,12 +337,23 @@
}
};
+ let ec_watermark = if is_ec {
+ self.balances
+ .load_balance_watermark_in_op(
+ op,
+ account_set.values().journal_id,
+ target_account_id,
+ )
+ .await?
+ } else {
+ None
+ };
+
let balances = self
.balances
.find_balances_for_update(op, account_set.values().journal_id, member_id)
.await?;
- let target_account_id = AccountId::from(&account_set.id());
let mut entries = Vec::new();
for balance in balances.into_values() {
entries_for_remove_balance(&mut entries, target_account_id, balance);
@@ -330,13 +364,6 @@
}
let entries = self.entries.create_all_in_op(op, entries).await?;
- let target_account = self
- .accounts
- .find_all_in_op::<Account>(&mut *op, &[target_account_id])
- .await?
- .remove(&target_account_id)
- .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
- let is_ec = target_account.values().config.eventually_consistent;
let entries_for_ec = if is_ec { Some(entries.clone()) } else { None };
let mappings = std::iter::once((target_account_id, parents)).collect();
@@ -359,6 +386,7 @@
target_account_id,
&ec_entries,
time,
+ ec_watermark,
)
.await?;
}
diff --git a/cala-ledger/src/balance/mod.rs b/cala-ledger/src/balance/mod.rs
--- a/cala-ledger/src/balance/mod.rs
+++ b/cala-ledger/src/balance/mod.rs
@@ -170,6 +170,20 @@
.await
}
+ pub(crate) async fn load_balance_watermark_in_op(
+ &self,
+ op: &mut impl es_entity::AtomicOperation,
+ journal_id: JournalId,
+ account_id: AccountId,
+ ) -> Result<Option<i64>, BalanceError> {
+ Ok(self
+ .repo
+ .load_account_set_balances_batch(op, journal_id, &[account_id])
+ .await?
+ .remove(&account_id)
+ .and_then(|(_, watermark)| watermark))
+ }
+
#[instrument(name = "cala_ledger.balance.update_balance_for_account_in_op", skip(self, op, entries), fields(journal_id = %journal_id, account_id = %account_id))]
pub(crate) async fn update_balance_for_account_in_op(
&self,
@@ -178,6 +192,7 @@
account_id: AccountId,
entries: &[EntryValues],
created_at: DateTime<Utc>,
+ watermark: Option<i64>,
) -> Result<(), BalanceError> {
let current = self
.repo
@@ -195,6 +210,9 @@
self.repo
.insert_new_snapshots(op, journal_id, new_balances)
.await?;
+ self.repo
+ .set_latest_seq_for_account(op, journal_id, account_id, watermark)
+ .await?;
Ok(())
}
diff --git a/cala-ledger/src/balance/repo.rs b/cala-ledger/src/balance/repo.rs
--- a/cala-ledger/src/balance/repo.rs
+++ b/cala-ledger/src/balance/repo.rs
@@ -304,6 +304,31 @@
Ok(())
}
+ pub(crate) async fn set_latest_seq_for_account(
+ &self,
+ op: &mut impl es_entity::AtomicOperation,
+ journal_id: JournalId,
+ account_id: AccountId,
+ latest_seq: Option<i64>,
+ ) -> Result<(), BalanceError> {
+ if let Some(latest_seq) = latest_seq {
+ sqlx::query(
+ r#"
+ UPDATE cala_current_balances
+ SET latest_seq = LEAST(latest_seq, $3::bigint)
+ WHERE journal_id = $1
+ AND account_id = $2
+ "#,
+ )
+ .bind(uuid::Uuid::from(journal_id))
+ .bind(uuid::Uuid::from(account_id))
+ .bind(latest_seq)
+ .execute(op.as_executor())
+ .await?;
+ }
+ Ok(())
+ }
+
#[instrument(
name = "balance.load_all_for_update",
skip(self, op),
diff --git a/cala-ledger/tests/account_set.rs b/cala-ledger/tests/account_set.rs
--- a/cala-ledger/tests/account_set.rs
+++ b/cala-ledger/tests/account_set.rs
@@ -1112,6 +1112,113 @@
}
#[tokio::test]
+async fn eventually_consistent_second_member_add_keeps_existing_member_history(
+) -> anyhow::Result<()> {
+ let btc: Currency = "BTC".parse().unwrap();
+
+ let pool = helpers::init_pool().await?;
+ let cala_config = CalaLedgerConfig::builder()
+ .pool(pool)
+ .exec_migrations(false)
+ .build()?;
+ let cala = CalaLedger::init(cala_config).await?;
+
+ let journal = cala
+ .journals()
+ .create(helpers::test_journal())
+ .await
+ .unwrap();
+
+ let (sender, first_member) = helpers::test_accounts();
+ let (_, second_member) = helpers::test_accounts();
+ let sender_account = cala.accounts().create(sender).await.unwrap();
+ let first_member_account = cala.accounts().create(first_member).await.unwrap();
+ let second_member_account = cala.accounts().create(second_member).await.unwrap();
+
+ let tx_code = Alphanumeric.sample_string(&mut rand::rng(), 32);
+ cala.tx_templates()
+ .create(helpers::currency_conversion_template(&tx_code))
+ .await
+ .unwrap();
+
+ let inline_set = NewAccountSet::builder()
+ .id(AccountSetId::new())
+ .name("Inline Set")
+ .journal_id(journal.id())
+ .build()
+ .unwrap();
+ let inline_set = cala.account_sets().create(inline_set).await.unwrap();
+
+ let ec_set = NewAccountSet::builder()
+ .id(AccountSetId::new())
+ .name("EC Set")
+ .journal_id(journal.id())
+ .eventually_consistent(true)
+ .build()
+ .unwrap();
+ let ec_set = cala.account_sets().create(ec_set).await.unwrap();
+
+ // First member joins with no existing balance.
+ cala.account_sets()
+ .add_member(inline_set.id(), first_member_account.id())
+ .await
+ .unwrap();
+ cala.account_sets()
+ .add_member(ec_set.id(), first_member_account.id())
+ .await
+ .unwrap();
+
+ // Transactions for first member remain unreconciled on the EC set.
+ for _ in 0..2 {
+ let mut params = Params::new();
+ params.insert("journal_id", journal.id().to_string());
+ params.insert("sender", sender_account.id());
+ params.insert("recipient", first_member_account.id());
+ cala.post_transaction(TransactionId::new(), &tx_code, params)
+ .await
+ .unwrap();
+ }
+
+ // Give second member pre-existing balance before joining both sets.
+ for _ in 0..2 {
+ let mut params = Params::new();
+ params.insert("journal_id", journal.id().to_string());
+ params.insert("sender", sender_account.id());
+ params.insert("recipient", second_member_account.id());
+ cala.post_transaction(TransactionId::new(), &tx_code, params)
+ .await
+ .unwrap();
+ }
+
+ cala.account_sets()
+ .add_member(inline_set.id(), second_member_account.id())
+ .await
+ .unwrap();
+ cala.account_sets()
+ .add_member(ec_set.id(), second_member_account.id())
+ .await
+ .unwrap();
+
+ cala.account_sets()
+ .recalculate_balances(ec_set.id())
+ .await
+ .unwrap();
+
+ let inline_bal = cala
+ .balances()
+ .find(journal.id(), inline_set.id(), btc)
+ .await?;
+ let ec_bal = cala.balances().find(journal.id(), ec_set.id(), btc).await?;
+ assert_eq!(
+ inline_bal.settled(),
+ ec_bal.settled(),
+ "EC set should not lose pre-existing member deltas when adding another member",
+ );
+
+ Ok(())
+}
+
+#[tokio::test]
async fn list_eventually_consistent_ids() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let cala_config = CalaLedgerConfig::builder()This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
PostgreSQL `nextval` is assigned at INSERT-statement time, not at COMMIT time, so `cala_balance_history.seq` ordering does not match commit visibility ordering. A recalc could advance `latest_seq` past an uncommitted poster's seq, causing that row to be silently skipped forever once it became visible. Fix: - Per-set transaction lock pair in the 2-arg `pg_advisory_xact_lock` namespace: posters take SHARED on every account they touch (leaves *and* every ancestor, EC and non-EC alike) before any nextval- generating insert; recalcs take EXCLUSIVE on every set being recalculated at the very top of the batch flow. Shared/shared does not contend, so concurrent posters still proceed in parallel; exclusive blocks until all in-flight posters touching the locked set's members have committed. Holding SHARED on every ancestor (not just the EC ones) means a recalc on either an EC or a non-EC set is correctly serialized against concurrent posters on its members. - The `latest_seq` watermark is still maintained as a side-effect of `insert_new_snapshots` (the `ON CONFLICT DO UPDATE` bumps it to `MAX(seq)` of the rows just inserted). With the SHARED lock in place there can be no uncommitted poster row whose seq sits between a recalc's input max and its output max, so the side-effect's "max output seq" value is now safe — every later poster is either blocked at the SHARED acquisition until the recalc commits and then gets a fresh seq strictly greater than every seq the recalc consumed, or already committed before the recalc started reading history (and was therefore folded in). - The poster's lock+load phase is two DB roundtrips instead of three: `find_for_update` now takes both poster locks in a single combined SQL statement — SHARED on every input row and FOR_UPDATE on non-EC rows only (the FOR_UPDATE lock is pointless on EC rows because posters never write `cala_current_balances` rows for EC accounts at all). Then it runs the existing data fetch query unchanged. - New `tests/ec_recalc_race.rs` stresses the interleaving with 8 writer + 4 recalc tasks across 4 iterations and asserts the EC set balance equals the sum of all posts both incrementally and after a final recalc. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Exercises the poster-vs-recalc lock protocol at ancestor depth > 1: a non-EC parent set contains an EC set which contains N leaves. Concurrent posters touch random leaves and concurrent recalcs run on the inner EC set. After the workload, the non-EC parent (built synchronously via the transitive closure in the poster path) and the inner EC set (rebuilt via recalc) must have identical balances equal to the sum of all posts. This covers the case where fetch_mappings_in_op must walk the full transitive ancestor chain so posters take shared advisory locks on every owning set, not just the direct parent. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Folding existing balance into a parent set after the fact is unsafe:
for EC sets, advancing the watermark past the new member's max history
seq can leap past unprocessed history rows of *other* members and
silently drop them from subsequent recalcs. The symmetric remove case
has no safe unfold for EC sets either, since the set's running balance
already reflects the member's past contributions and recalcs only fold
*new* history on top.
Rather than try to make those paths race-free, forbid the operation
universally — for both EC and non-EC parent sets — so the rule is one
sentence and the implementation has no fold path at all.
- `Balances::member_has_balance_history_in_op` takes exclusive locks
on `{parent, member}` (in the EC-set lock namespace, same as recalcs)
and runs an `EXISTS` check against `cala_balance_history`. Holding
the exclusive lock on the member serializes against any in-flight
poster on it, so the result reflects committed state.
- `add_member_in_op` and `remove_member_in_op` resolve the target
upfront, run the no-history check via the new private helper
`assert_member_history_empty_in_op`, and on a non-empty result
short-circuit with `AccountSetError::MemberHasBalanceHistory`. The
whole fold path (`find_balances_for_update`,
`update_balance_for_account_in_op`, `entries_for_add_balance`,
`entries_for_remove_balance`, the `update_balances_in_op` call for
synthetic fold entries, the `is_ec` branch, the `entries: Entries`
field on `AccountSets`) is deleted.
- `Balances::find_balances_for_update`,
`Balances::update_balance_for_account_in_op` and
`BalanceRepo::load_all_for_update` had no other callers and are
removed.
- The pre-existing `eventually_consistent_member_add_with_existing_balance`
test asserted the now-removed fold-on-add behaviour and is deleted;
the `balances` test is rewritten to wire its hierarchy up before any
posts. Two new regression tests assert that `add_member` and
`remove_member` return `MemberHasBalanceHistory` against any target
whose candidate member has any row in `cala_balance_history`.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d4e4098 to
262a20b
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 262a20b. Configure here.
The recalc API has always been documented as a "building block for
periodic reconciliation jobs that need to batch-recalculate balances
for EC account sets" (see the doc-comment on
`list_eventually_consistent_ids`), but the entry points themselves
silently accepted any account set, including non-EC ones. Calling
`recalculate_balances` on a non-EC set would either be a no-op (best
case) or, if the bulk-insert nextval ordering happened to put the
ancestor's row before the leaf's row in the same batch, leave the
ancestor's `latest_seq < leaf's seq`, causing the next recalc to
re-fold the leaf's contribution and double-count.
Rather than fix the underlying within-batch ordering issue (which
would require a behaviour change to `insert_new_snapshots`), enforce
the documented contract at the API surface:
- New `AccountSetError::CannotRecalculateNonEcSet { account_set_id }`.
- `recalculate_balances_batch_in_op` (the lowest public entry point
that all of `recalculate_balances` / `_batch` / `_deep` funnel
through) now also fetches each set's underlying `Account` and
rejects any input whose `eventually_consistent` flag is `false`.
- `find_all_descendant_set_ids` is renamed to
`find_all_ec_descendant_set_ids` and JOINs `cala_accounts` to
filter the recursive walk to EC descendants only. So calling
`recalculate_balances_deep` on an EC root with a mixed hierarchy
silently skips the non-EC descendants instead of erroring on them
— the deep walk only ever surfaces sets the recalc can actually
process.
Two regression tests:
- `recalculate_balances_errors_on_non_ec_set` asserts that both
`recalculate_balances` and `recalculate_balances_batch` return
the new error against a non-EC set.
- `recalculate_balances_deep_skips_non_ec_descendants` builds an
EC root with one EC and one non-EC child, posts to a leaf in each
subtree, and asserts that a deep recalc folds the EC subtree into
the EC root (transitively, via both children's leaves) while
leaving the non-EC child's balance and version untouched.
Reported by Cursor Bugbot in
#697 (review comment).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
`Balances::member_has_balance_history_in_op` (the history-count check
that `add_member_in_op` / `remove_member_in_op` run before mutating
membership) was taking an *exclusive* advisory lock on both the
parent set account and the member account. The parent EXCL turned
out to be the load-bearing edge in a three-way deadlock cycle
observed in a lana-bank 1000-loan stress workflow:
- Tx_A (a multi-call `add_member` transaction) holds EXCL on one
hot parent set from an earlier call and wants EXCL on another hot
parent set in a later call, where "hot" means "a parent set every
concurrent activation touches" and the application picks them in
application order, not canonical-UUID order.
- Tx_C (a poster running `find_for_update`) holds SHARED on the
second parent set and wants SHARED on the first, because a leaf
it's writing is a member of both.
- Tx_B (another `add_member` transaction waiting on the global
`ADDVISORY_LOCK_ID` CTE serializer) participates as the third
vertex, giving PostgreSQL a deadlock-detector triangle to kill.
The fix is to recognise that the parent lock in this check only
needs to serialize with a concurrent *recalc* on the same parent
(EXCL/SHARED is incompatible), not with concurrent *posters*. So
take SHARED on the parent and keep EXCLUSIVE on the member. The
member EXCL is what actually makes the `EXISTS` stable — an
in-flight poster on the member holds SHARED on it via
`find_for_update` and blocks against our EXCL, so committed state
is visible by the time the existence check runs. SHARED on the
parent is still incompatible with recalc's EXCL on the same parent,
so the add-vs-recalc serialization is preserved, but SHARED/SHARED
between add_member and poster is compatible, which breaks the
cycle.
Implementation:
- The lock prelude and the `EXISTS` check collapse into a single
`BalanceRepo::member_has_balance_history_in_op` method that
issues two SQL statements back-to-back: a canonically-ordered
lock query using `CASE WHEN v.account_id = $member THEN EXCL
ELSE SHARED END`, then the existing `EXISTS` query against
`cala_balance_history`. The Balances wrapper in
`balance/mod.rs` is now a thin delegate.
- The separate public `account_has_any_balance_history_in_op`
repo method is gone — it only existed as the non-locking half
and had no other callers.
Regression test: `tests/ec_recalc_race.rs` gains
`add_member_multi_call_no_deadlock_with_posters`, which reproduces
the exact shape from the report — several tokio tasks each opening
a transaction that does several `add_member_in_op` calls against a
shared pool of EC parent sets in *randomised* (non-canonical) order,
interleaved with poster tasks posting to a single hot leaf that is
a direct member of every parent (so each poster's `find_for_update`
takes SHARED on every parent at once). Verified locally: 10/10
reliably deadlock when the parent lock is restored to EXCLUSIVE,
10/10 reliably pass with the fix.
Also adds `err(level = "warn")` to the `#[instrument]` attributes
on `update_balances_in_op`, `recalculate_account_set_balances_batch_in_op`,
`member_has_balance_history_in_op`, `lock_accounts_exclusive_in_op`,
`add_member_in_op`, `remove_member_in_op`, and the four
`{add,remove}_member_{account,set}_and_return_parents` repo
functions. The original report noted that the deadlock errors were
not surfacing on OTel spans, because `#[instrument]` without
`err(...)` leaves span status unset on error return; adding the
attribute makes this class of problem visible in production traces
without having to read PG logs.
Reported by lana-bank `task/lana-ec-impl-019d4a85` hitting the
1000-loan admin-cli generator against cala `262a20be`.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…order `BalanceRepo::lock_accounts_exclusive_in_op` took a `HashSet<AccountId>`, collected it into a `Vec<AccountId>` in HashSet iteration order (per-process-hashed, nondeterministic), and relied on an SQL-level `ORDER BY account_id` at the tail of the lock query to get canonical acquisition order. That reliance is shaky: PG is free to evaluate the per-row projection — the `pg_advisory_xact_lock` call in the SELECT list — before the sort node, in which case the locks fire in the Vec's arbitrary input order. For single-set recalcs this is fine (one lock, no order matters), but two concurrent `recalculate_balances_batch_in_op` calls on overlapping sets could acquire the shared EXCL locks in opposite orders and deadlock. Fix by sorting at the Rust level after collecting the HashSet, mirroring the BTreeSet discipline the poster path already uses in `Balances::update_balances_in_op`. With a sorted input array, UNNEST emits rows in sorted order, `pg_advisory_xact_lock` fires in sorted order, and the SQL `ORDER BY` becomes redundant — dropped as cleanup, because leaving it behind implies the SQL is what enforces order (it isn't). No behaviour change for any existing non-overlapping caller, and no test failure mode that was previously reachable — closing a latent fragility before a future caller triggers it. Reported in a PR #697 review comment ("Fix lock ordering in lock_accounts_exclusive_in_op (Finding #2)"). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>


Summary
NewAccountBuilderwhereeventually_consistent()set the wrong field andinto_values()hardcoded it tofalserecalculate_balances()toAccountSetsservice that aggregates member account balances into account set balance snapshots for async rollupeventually_consistentfield toNewAccountSetbuilder so account sets can opt in to deferred balance computationrecalculate_balances()materializes correct aggregateTest plan
eventually_consistent_balancesverifies:eventually_consistent: truerecalculate_balances()produces correct aggregated balancebalancestest continues to pass (non-EC account sets unaffected)nix flake checkpasses (fmt, clippy -D warnings, nextest)🤖 Generated with Claude Code
Note
High Risk
High risk because it changes core balance aggregation/concurrency behavior (new advisory locking, watermarking via
seq, and batch recalculation paths) and introduces a schema change affecting balance history/current balance writes.Overview
Enables eventually-consistent account sets whose balances are no longer updated inline during posting; instead, balances are materialized via new
AccountSets::recalculate_balances*APIs (single, batch, and deep/descendant) that rebuild account-set snapshots from membercala_balance_historydeltas.Fixes propagation of the
eventually_consistentflag throughNewAccount/NewAccountSet, adds new validation that membership changes are rejected once a member has any balance history, and adds pagination/query support for listing EC account set ids.Reworks balance persistence to support async rollups: adds
cala_balance_history.seq(with index/sequence) andcala_current_balances.latest_seqwatermarking, updates advisory-lock strategy for posters vs recalculators, and extends effective-balance logic to delete/rebuild cumulative effective snapshots from the earliest affected effective date. Includes extensive new integration tests covering EC behavior, batching/deep recalc, idempotency, and error cases.Reviewed by Cursor Bugbot for commit 0d7e524. Bugbot is set up for automated code reviews on this repo. Configure here.