Skip to content

fix(account_set): activate eventually_consistent flag for async balance rollup#697

Open
bodymindarts wants to merge 36 commits intomainfrom
task/cala-ec-phase1-019d4863
Open

fix(account_set): activate eventually_consistent flag for async balance rollup#697
bodymindarts wants to merge 36 commits intomainfrom
task/cala-ec-phase1-019d4863

Conversation

@bodymindarts
Copy link
Copy Markdown
Member

@bodymindarts bodymindarts commented Apr 1, 2026

Summary

  • Fix two bugs in NewAccountBuilder where eventually_consistent() set the wrong field and into_values() hardcoded it to false
  • Add recalculate_balances() to AccountSets service that aggregates member account balances into account set balance snapshots for async rollup
  • Add eventually_consistent field to NewAccountSet builder so account sets can opt in to deferred balance computation
  • Include integration test proving end-to-end flow: EC account set skips inline balance update, recalculate_balances() materializes correct aggregate

Test plan

  • Integration test eventually_consistent_balances verifies:
    • Account set created with eventually_consistent: true
    • Transaction posts without updating account set balance inline
    • recalculate_balances() produces correct aggregated balance
  • Existing balances test continues to pass (non-EC account sets unaffected)
  • nix flake check passes (fmt, clippy -D warnings, nextest)

🤖 Generated with Claude Code


Note

High Risk
High risk because it changes core balance aggregation/concurrency behavior (new advisory locking, watermarking via seq, and batch recalculation paths) and introduces a schema change affecting balance history/current balance writes.

Overview
Enables eventually-consistent account sets whose balances are no longer updated inline during posting; instead, balances are materialized via new AccountSets::recalculate_balances* APIs (single, batch, and deep/descendant) that rebuild account-set snapshots from member cala_balance_history deltas.

Fixes propagation of the eventually_consistent flag through NewAccount/NewAccountSet, adds new validation that membership changes are rejected once a member has any balance history, and adds pagination/query support for listing EC account set ids.

Reworks balance persistence to support async rollups: adds cala_balance_history.seq (with index/sequence) and cala_current_balances.latest_seq watermarking, updates advisory-lock strategy for posters vs recalculators, and extends effective-balance logic to delete/rebuild cumulative effective snapshots from the earliest affected effective date. Includes extensive new integration tests covering EC behavior, batching/deep recalc, idempotency, and error cases.

Reviewed by Cursor Bugbot for commit 0d7e524. Bugbot is set up for automated code reviews on this repo. Configure here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 1, 2026

📊 Performance Report

Commit: 0d7e524
Updated: 2026-04-09 14:43:33 UTC

Cala Performance Benchmark Results (non-representative)

Criterion Benchmark Results (single-threaded)

Benchmark Time per Run Throughput % vs Baseline
post_simple_transaction 7.423ms 134 tx/s 0 (baseline)
post_and_recalculate_ec_account_set 18.901ms 52 tx/s -154.0%
post_and_batch_recalculate_ec_account_set 13.156ms 76 tx/s -77.0%
post_multi_layer_transaction 7.674ms 130 tx/s -3.0%
post_simple_transaction_with_effective_balances 9.713ms 102 tx/s -30.0%
post_simple_transaction_with_skipped_velocity 6.670ms 149 tx/s +10.0%
post_simple_transaction_with_velocity 8.424ms 118 tx/s -13.0%
post_simple_transaction_with_hit_velocity 4.200ms 238 tx/s +43.0%
post_simple_transaction_with_one_account_set 6.809ms 146 tx/s +8.0%
post_simple_transaction_with_five_account_sets 8.046ms 124 tx/s -8.0%
post_simple_transaction_with_ec_account_set 6.565ms 152 tx/s +11.0%

Load Testing Results (parallel-execution)

Scenario tx/s
1 parallel 93.38
2 parallel 127.08
5 parallel 163.21
10 parallel 164.76
20 parallel 160.73
2 contention 125.60
5 contention 142.75
2 acct_sets 115.29
5 acct_sets 125.80

Note: Performance results may vary based on system resources and database state.

Last updated by commit 0d7e524

@bodymindarts bodymindarts force-pushed the task/cala-ec-phase1-019d4863 branch from f4b722e to 622a84a Compare April 1, 2026 10:57
…ce rollup

Fix two bugs in NewAccountBuilder that prevented the eventually_consistent
flag from being persisted:
- eventually_consistent() method was setting is_account_set instead of
  eventually_consistent
- into_values() hardcoded eventually_consistent to false instead of using
  the struct field

Add incremental recalculate_balances() to AccountSets service that replays
member balance_history deltas into account set balance snapshots. The replay
uses SQL LAG window function to compute per-entry deltas from leaf member
accounts, applies them to the account set's running balance, and inserts
identical balance_history rows to what the inline path would have created.

Incremental behavior: each recalculate picks up from where it left off
(using sum of latest_versions as offset), making it idempotent and efficient
for repeated calls.

Add NewAccountSet.eventually_consistent field so account sets can opt in
to async balance computation.

Include integration test verifying: multi-transaction batches, incremental
recalculate across batches, snapshot-level parity with inline path (count,
versions, running balances, entry_ids), and idempotency.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@bodymindarts bodymindarts force-pushed the task/cala-ec-phase1-019d4863 branch from 622a84a to adc0788 Compare April 1, 2026 11:03
bodymindarts and others added 3 commits April 1, 2026 13:21
Replace ROW_NUMBER-based offset with entry_id watermark in
fetch_incremental_member_history. The ROW_NUMBER approach broke when
account set membership changed between recalculations because row
numbering shifted. Entry IDs (UUID v7, monotonically ordered) provide
a stable cursor regardless of membership changes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
PostgreSQL doesn't support MAX() on UUID type natively. Replace with
ORDER BY latest_entry_id DESC LIMIT 1 which uses btree ordering and
avoids the unsupported aggregate.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace sqlx::query() with sqlx::query!() in load_account_set_balances
and fetch_incremental_member_history for compile-time SQL verification.
Regenerate .sqlx offline cache.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
bodymindarts and others added 15 commits April 1, 2026 15:19
When adding or removing a member from an eventually-consistent account
set, the balance was never updated because find_for_update filters by
eventually_consistent=FALSE. This adds update_balance_for_account_in_op
which directly locks and updates the EC account's balance via
load_all_for_update, bypassing that filter. The entry_id watermark
naturally prevents double-counting during subsequent recalculations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove redundant SQL round trip in load_account_set_balances — the
entry_id watermark is already available in the BalanceSnapshot JSON
loaded by the first query. Extract it via max_by_key on the loaded
balances instead of a separate cala_balance_history query.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ances

The watermark is just the max entry_id across the loaded balance
snapshots — returning it separately couples the repo method to a
concern of its single caller. Move derivation to the call site in
recalculate_account_set_balances_in_op instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tness

A single global watermark works across all currencies because EntryId
uses UUID v7 (monotonically ordered by creation time). Document this
invariant on fetch_incremental_member_history and at the derivation
site so future maintainers know the coupling.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…g to Uuid

Remove unnecessary watermark.map(uuid::Uuid::from) — sqlx accepts
Option<EntryId> directly via the `as` cast.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add five unit tests covering: first-run version numbering, incremental
delta application on existing balances, multi-delta accumulation,
multi-currency independence, and empty-history no-op.

Also clarify the version: 0 initializer matches Snapshots::new_snapshot
(both start at 0 and increment before persisting).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
UUID v7 entry_id was only monotonic within a single process. In HA
deployments with multiple writers, clock drift between pods could
cause incremental EC recalculation to skip balance_history rows.

Add a BIGSERIAL `seq` column to cala_balance_history that provides
globally-ordered sequencing via PostgreSQL sequences. Track the
high-water `latest_seq` on cala_current_balances for efficient
watermark retrieval during recalculation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…embers

Batch method that loads all sets' balances, fetches the union of member
histories in one query, and replays deltas in a single pass — dispatching
each row to every set that contains that member. This eliminates
redundant balance_history scans when recalculating sibling/ancestor sets
in a hierarchy.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The single-set `recalculate_balances_in_op` now delegates to
`recalculate_balances_batch_in_op(&[id])`, eliminating the duplicate
code path. Removed `load_account_set_balances`,
`fetch_incremental_member_history`, and `replay_member_deltas` in
favour of their batch equivalents. Unit tests ported to exercise
`replay_member_deltas_batch` directly, with two new cases:
shared-member dispatch and watermark-based row skipping.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…hierarchies

Adds recalculate_balances_deep that discovers all descendant account sets
via recursive CTE and includes them in a single batch recalculation call.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… test

The batch_recalculate_shared_members test had recipient_account as a
member of both set_a and set_b. When both sets were added to root,
the transitive cascade tried to add recipient_account to root twice,
causing MemberAlreadyAdded. Give each child set its own exclusive
leaf account instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 3 new Criterion benchmarks for eventually consistent account sets:
- #9: EC posting only (no inline recalculate)
- #10: EC post + recalculate after every tx (worst case)
- #11: EC 10 posts + 1 batch recalculate (amortized)

Also adds `eventually_consistent` parameter to the
`init_accounts_with_account_sets_depth` helper.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Run `make sqlx-prepare` to regenerate the offline query cache for
new queries in account_set/repo.rs and balance/repo.rs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The benchmark was running 10 transactions inside the timed loop,
making it appear ~10x slower than it actually is. Change to 1 post
+ 1 batch recalculate per iteration, matching the structure of
benchmark #10 for a direct comparison. The batch recalculate still
amortizes across accumulated staleness from prior iterations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
bodymindarts and others added 3 commits April 2, 2026 09:22
Replace `self.accounts.find(target_account_id)` with
`self.accounts.find_all_in_op()` in both `add_member_in_op` and
`remove_member_in_op`. The pool-based `find` bypassed the caller's
transaction, so it couldn't see an account created in the same
uncommitted atomic operation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
recalculate_account_set_balances_batch_in_op only rebuilt
cala_current_balances, leaving cala_cumulative_effective_balances stale
for EC sets. This adds a self-contained effective recalculation path
that mirrors the inline architecture:

- EffectiveBalanceRepo: 4 new methods (fetch_member_effective_history,
  delete_at_or_after, load_latest_before, insert_recalc_snapshots)
- EffectiveBalances: replay_effective_deltas pure-computation sweep
  grouped by effective_date, with per-date version resets
- balance/mod.rs: delegates to effective module when journal has
  effective balances enabled

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove the separate 20260401154215_add_balance_history_seq migration
and fold its changes (seq column, latest_seq column, index) directly
into the initial ledger_setup migration. No backwards compatibility
needed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
bodymindarts and others added 3 commits April 2, 2026 10:27
…tion

When a back-dated transaction is posted between existing effective dates,
the watermark-filtered history fetch missed old member history needed to
rebuild deleted effective rows at later dates. Fix by first discovering
min_effective_date from new rows via watermark, then re-fetching ALL
history from that date onward (with LAG on full partition) so later dates
are correctly rebuilt.

Also adds integration test for EC account set effective balance
recalculation covering idempotency and incremental scenarios.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CI uses standard cargo which only sees the crate-level .sqlx/ directory.
Moved the missing query cache entry to cala-ledger/.sqlx/ and removed
the duplicate root-level .sqlx/ directory entirely.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
HonestMajority and others added 4 commits April 7, 2026 12:43
…sets

Add `AccountSets::list_eventually_consistent_ids` (plus an `_in_op`
variant) which returns the ids of all account sets whose underlying
account is marked `eventually_consistent`.

This replaces the need for downstream consumers to reach into the cala
schema with raw SQL (e.g. `SELECT id FROM cala_accounts WHERE
is_account_set = TRUE AND eventually_consistent = TRUE`) when they want
to drive periodic reconciliation via `recalculate_balances_deep`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously this API returned an unbounded Vec<AccountSetId>, which could
grow with the number of eventually consistent account sets. Consumers
(e.g. periodic reconciliation jobs that batch-recalculate balances) now
paginate through the list using the macro-generated AccountSetByIdCursor,
consistent with the PaginatedQueryArgs/PaginatedQueryRet shape used by
the other list endpoints on AccountSets.

BREAKING CHANGE: AccountSets::list_eventually_consistent_ids{,_in_op}
now take es_entity::PaginatedQueryArgs<AccountSetByIdCursor> and return
es_entity::PaginatedQueryRet<AccountSetId, AccountSetByIdCursor>.
The previous return type
`HashMap<(AccountId, Currency), (BalanceSnapshot, i32)>` required a
`#[allow(clippy::type_complexity)]` suppression. Introduce a named
`LatestBeforeEntry` struct with explicit `snapshot` and
`all_time_version` fields so the type is self-documenting and the
suppression can be removed.
Benchmark #10 previously invoked `recalculate_balances_batch` over
`[sender_set_id, recipient_set_id]`, making it identical to benchmark
#11. Use the singular `recalculate_balances` call twice (once per set)
so #10 serves as a meaningful baseline for #11's batched form.
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Watermark advancement in add_member skips member history
    • For EC account sets, add/remove member now preserves the pre-change set watermark and restores latest_seq after summary snapshot writes so pending member history cannot be skipped.

Create PR

Or push these changes by commenting:

@cursor push e5ccc8171a
Preview (e5ccc8171a)
diff --git a/cala-ledger/src/account_set/mod.rs b/cala-ledger/src/account_set/mod.rs
--- a/cala-ledger/src/account_set/mod.rs
+++ b/cala-ledger/src/account_set/mod.rs
@@ -168,6 +168,15 @@
         member: impl Into<AccountSetMemberId>,
     ) -> Result<AccountSet, AccountSetError> {
         let member = member.into();
+        let target_account_id = AccountId::from(account_set_id);
+        let target_account = self
+            .accounts
+            .find_all_in_op::<Account>(&mut *op, &[target_account_id])
+            .await?
+            .remove(&target_account_id)
+            .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
+        let is_ec = target_account.values().config.eventually_consistent;
+
         let (time, parents, account_set, member_id) = match member {
             AccountSetMemberId::Account(id) => {
                 tracing::Span::current().record("is_account", true);
@@ -207,12 +216,23 @@
             }
         };
 
+        let ec_watermark = if is_ec {
+            self.balances
+                .load_balance_watermark_in_op(
+                    op,
+                    account_set.values().journal_id,
+                    target_account_id,
+                )
+                .await?
+        } else {
+            None
+        };
+
         let balances = self
             .balances
             .find_balances_for_update(op, account_set.values().journal_id, member_id)
             .await?;
 
-        let target_account_id = AccountId::from(&account_set.id());
         let mut entries = Vec::new();
         for balance in balances.into_values() {
             entries_for_add_balance(&mut entries, target_account_id, balance);
@@ -223,13 +243,6 @@
         }
         let entries = self.entries.create_all_in_op(op, entries).await?;
 
-        let target_account = self
-            .accounts
-            .find_all_in_op::<Account>(&mut *op, &[target_account_id])
-            .await?
-            .remove(&target_account_id)
-            .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
-        let is_ec = target_account.values().config.eventually_consistent;
         let entries_for_ec = if is_ec { Some(entries.clone()) } else { None };
 
         let mappings = std::iter::once((target_account_id, parents)).collect();
@@ -252,6 +265,7 @@
                     target_account_id,
                     &ec_entries,
                     time,
+                    ec_watermark,
                 )
                 .await?;
         }
@@ -281,6 +295,15 @@
         member: impl Into<AccountSetMemberId>,
     ) -> Result<AccountSet, AccountSetError> {
         let member = member.into();
+        let target_account_id = AccountId::from(account_set_id);
+        let target_account = self
+            .accounts
+            .find_all_in_op::<Account>(&mut *op, &[target_account_id])
+            .await?
+            .remove(&target_account_id)
+            .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
+        let is_ec = target_account.values().config.eventually_consistent;
+
         let (time, parents, account_set, member_id) = match member {
             AccountSetMemberId::Account(id) => {
                 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
@@ -314,12 +337,23 @@
             }
         };
 
+        let ec_watermark = if is_ec {
+            self.balances
+                .load_balance_watermark_in_op(
+                    op,
+                    account_set.values().journal_id,
+                    target_account_id,
+                )
+                .await?
+        } else {
+            None
+        };
+
         let balances = self
             .balances
             .find_balances_for_update(op, account_set.values().journal_id, member_id)
             .await?;
 
-        let target_account_id = AccountId::from(&account_set.id());
         let mut entries = Vec::new();
         for balance in balances.into_values() {
             entries_for_remove_balance(&mut entries, target_account_id, balance);
@@ -330,13 +364,6 @@
         }
         let entries = self.entries.create_all_in_op(op, entries).await?;
 
-        let target_account = self
-            .accounts
-            .find_all_in_op::<Account>(&mut *op, &[target_account_id])
-            .await?
-            .remove(&target_account_id)
-            .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
-        let is_ec = target_account.values().config.eventually_consistent;
         let entries_for_ec = if is_ec { Some(entries.clone()) } else { None };
 
         let mappings = std::iter::once((target_account_id, parents)).collect();
@@ -359,6 +386,7 @@
                     target_account_id,
                     &ec_entries,
                     time,
+                    ec_watermark,
                 )
                 .await?;
         }

diff --git a/cala-ledger/src/balance/mod.rs b/cala-ledger/src/balance/mod.rs
--- a/cala-ledger/src/balance/mod.rs
+++ b/cala-ledger/src/balance/mod.rs
@@ -170,6 +170,20 @@
             .await
     }
 
+    pub(crate) async fn load_balance_watermark_in_op(
+        &self,
+        op: &mut impl es_entity::AtomicOperation,
+        journal_id: JournalId,
+        account_id: AccountId,
+    ) -> Result<Option<i64>, BalanceError> {
+        Ok(self
+            .repo
+            .load_account_set_balances_batch(op, journal_id, &[account_id])
+            .await?
+            .remove(&account_id)
+            .and_then(|(_, watermark)| watermark))
+    }
+
     #[instrument(name = "cala_ledger.balance.update_balance_for_account_in_op", skip(self, op, entries), fields(journal_id = %journal_id, account_id = %account_id))]
     pub(crate) async fn update_balance_for_account_in_op(
         &self,
@@ -178,6 +192,7 @@
         account_id: AccountId,
         entries: &[EntryValues],
         created_at: DateTime<Utc>,
+        watermark: Option<i64>,
     ) -> Result<(), BalanceError> {
         let current = self
             .repo
@@ -195,6 +210,9 @@
         self.repo
             .insert_new_snapshots(op, journal_id, new_balances)
             .await?;
+        self.repo
+            .set_latest_seq_for_account(op, journal_id, account_id, watermark)
+            .await?;
         Ok(())
     }
 

diff --git a/cala-ledger/src/balance/repo.rs b/cala-ledger/src/balance/repo.rs
--- a/cala-ledger/src/balance/repo.rs
+++ b/cala-ledger/src/balance/repo.rs
@@ -304,6 +304,31 @@
         Ok(())
     }
 
+    pub(crate) async fn set_latest_seq_for_account(
+        &self,
+        op: &mut impl es_entity::AtomicOperation,
+        journal_id: JournalId,
+        account_id: AccountId,
+        latest_seq: Option<i64>,
+    ) -> Result<(), BalanceError> {
+        if let Some(latest_seq) = latest_seq {
+            sqlx::query(
+                r#"
+                UPDATE cala_current_balances
+                SET latest_seq = LEAST(latest_seq, $3::bigint)
+                WHERE journal_id = $1
+                  AND account_id = $2
+                "#,
+            )
+            .bind(uuid::Uuid::from(journal_id))
+            .bind(uuid::Uuid::from(account_id))
+            .bind(latest_seq)
+            .execute(op.as_executor())
+            .await?;
+        }
+        Ok(())
+    }
+
     #[instrument(
         name = "balance.load_all_for_update",
         skip(self, op),

diff --git a/cala-ledger/tests/account_set.rs b/cala-ledger/tests/account_set.rs
--- a/cala-ledger/tests/account_set.rs
+++ b/cala-ledger/tests/account_set.rs
@@ -1112,6 +1112,113 @@
 }
 
 #[tokio::test]
+async fn eventually_consistent_second_member_add_keeps_existing_member_history(
+) -> anyhow::Result<()> {
+    let btc: Currency = "BTC".parse().unwrap();
+
+    let pool = helpers::init_pool().await?;
+    let cala_config = CalaLedgerConfig::builder()
+        .pool(pool)
+        .exec_migrations(false)
+        .build()?;
+    let cala = CalaLedger::init(cala_config).await?;
+
+    let journal = cala
+        .journals()
+        .create(helpers::test_journal())
+        .await
+        .unwrap();
+
+    let (sender, first_member) = helpers::test_accounts();
+    let (_, second_member) = helpers::test_accounts();
+    let sender_account = cala.accounts().create(sender).await.unwrap();
+    let first_member_account = cala.accounts().create(first_member).await.unwrap();
+    let second_member_account = cala.accounts().create(second_member).await.unwrap();
+
+    let tx_code = Alphanumeric.sample_string(&mut rand::rng(), 32);
+    cala.tx_templates()
+        .create(helpers::currency_conversion_template(&tx_code))
+        .await
+        .unwrap();
+
+    let inline_set = NewAccountSet::builder()
+        .id(AccountSetId::new())
+        .name("Inline Set")
+        .journal_id(journal.id())
+        .build()
+        .unwrap();
+    let inline_set = cala.account_sets().create(inline_set).await.unwrap();
+
+    let ec_set = NewAccountSet::builder()
+        .id(AccountSetId::new())
+        .name("EC Set")
+        .journal_id(journal.id())
+        .eventually_consistent(true)
+        .build()
+        .unwrap();
+    let ec_set = cala.account_sets().create(ec_set).await.unwrap();
+
+    // First member joins with no existing balance.
+    cala.account_sets()
+        .add_member(inline_set.id(), first_member_account.id())
+        .await
+        .unwrap();
+    cala.account_sets()
+        .add_member(ec_set.id(), first_member_account.id())
+        .await
+        .unwrap();
+
+    // Transactions for first member remain unreconciled on the EC set.
+    for _ in 0..2 {
+        let mut params = Params::new();
+        params.insert("journal_id", journal.id().to_string());
+        params.insert("sender", sender_account.id());
+        params.insert("recipient", first_member_account.id());
+        cala.post_transaction(TransactionId::new(), &tx_code, params)
+            .await
+            .unwrap();
+    }
+
+    // Give second member pre-existing balance before joining both sets.
+    for _ in 0..2 {
+        let mut params = Params::new();
+        params.insert("journal_id", journal.id().to_string());
+        params.insert("sender", sender_account.id());
+        params.insert("recipient", second_member_account.id());
+        cala.post_transaction(TransactionId::new(), &tx_code, params)
+            .await
+            .unwrap();
+    }
+
+    cala.account_sets()
+        .add_member(inline_set.id(), second_member_account.id())
+        .await
+        .unwrap();
+    cala.account_sets()
+        .add_member(ec_set.id(), second_member_account.id())
+        .await
+        .unwrap();
+
+    cala.account_sets()
+        .recalculate_balances(ec_set.id())
+        .await
+        .unwrap();
+
+    let inline_bal = cala
+        .balances()
+        .find(journal.id(), inline_set.id(), btc)
+        .await?;
+    let ec_bal = cala.balances().find(journal.id(), ec_set.id(), btc).await?;
+    assert_eq!(
+        inline_bal.settled(),
+        ec_bal.settled(),
+        "EC set should not lose pre-existing member deltas when adding another member",
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
 async fn list_eventually_consistent_ids() -> anyhow::Result<()> {
     let pool = helpers::init_pool().await?;
     let cala_config = CalaLedgerConfig::builder()

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

HonestMajority and others added 3 commits April 9, 2026 11:33
PostgreSQL `nextval` is assigned at INSERT-statement time, not at
COMMIT time, so `cala_balance_history.seq` ordering does not match
commit visibility ordering. A recalc could advance `latest_seq` past
an uncommitted poster's seq, causing that row to be silently skipped
forever once it became visible.

Fix:
- Per-set transaction lock pair in the 2-arg `pg_advisory_xact_lock`
  namespace: posters take SHARED on every account they touch (leaves
  *and* every ancestor, EC and non-EC alike) before any nextval-
  generating insert; recalcs take EXCLUSIVE on every set being
  recalculated at the very top of the batch flow. Shared/shared does
  not contend, so concurrent posters still proceed in parallel;
  exclusive blocks until all in-flight posters touching the locked
  set's members have committed. Holding SHARED on every ancestor
  (not just the EC ones) means a recalc on either an EC or a non-EC
  set is correctly serialized against concurrent posters on its
  members.
- The `latest_seq` watermark is still maintained as a side-effect of
  `insert_new_snapshots` (the `ON CONFLICT DO UPDATE` bumps it to
  `MAX(seq)` of the rows just inserted). With the SHARED lock in
  place there can be no uncommitted poster row whose seq sits between
  a recalc's input max and its output max, so the side-effect's
  "max output seq" value is now safe — every later poster is either
  blocked at the SHARED acquisition until the recalc commits and
  then gets a fresh seq strictly greater than every seq the recalc
  consumed, or already committed before the recalc started reading
  history (and was therefore folded in).
- The poster's lock+load phase is two DB roundtrips instead of three:
  `find_for_update` now takes both poster locks in a single combined
  SQL statement — SHARED on every input row and FOR_UPDATE on
  non-EC rows only (the FOR_UPDATE lock is pointless on EC rows
  because posters never write `cala_current_balances` rows for EC
  accounts at all). Then it runs the existing data fetch query
  unchanged.
- New `tests/ec_recalc_race.rs` stresses the interleaving with 8
  writer + 4 recalc tasks across 4 iterations and asserts the EC set
  balance equals the sum of all posts both incrementally and after a
  final recalc.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Exercises the poster-vs-recalc lock protocol at ancestor depth > 1:
a non-EC parent set contains an EC set which contains N leaves.
Concurrent posters touch random leaves and concurrent recalcs run on
the inner EC set. After the workload, the non-EC parent (built
synchronously via the transitive closure in the poster path) and the
inner EC set (rebuilt via recalc) must have identical balances equal
to the sum of all posts.

This covers the case where fetch_mappings_in_op must walk the full
transitive ancestor chain so posters take shared advisory locks on
every owning set, not just the direct parent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Folding existing balance into a parent set after the fact is unsafe:
for EC sets, advancing the watermark past the new member's max history
seq can leap past unprocessed history rows of *other* members and
silently drop them from subsequent recalcs. The symmetric remove case
has no safe unfold for EC sets either, since the set's running balance
already reflects the member's past contributions and recalcs only fold
*new* history on top.

Rather than try to make those paths race-free, forbid the operation
universally — for both EC and non-EC parent sets — so the rule is one
sentence and the implementation has no fold path at all.

- `Balances::member_has_balance_history_in_op` takes exclusive locks
  on `{parent, member}` (in the EC-set lock namespace, same as recalcs)
  and runs an `EXISTS` check against `cala_balance_history`. Holding
  the exclusive lock on the member serializes against any in-flight
  poster on it, so the result reflects committed state.
- `add_member_in_op` and `remove_member_in_op` resolve the target
  upfront, run the no-history check via the new private helper
  `assert_member_history_empty_in_op`, and on a non-empty result
  short-circuit with `AccountSetError::MemberHasBalanceHistory`. The
  whole fold path (`find_balances_for_update`,
  `update_balance_for_account_in_op`, `entries_for_add_balance`,
  `entries_for_remove_balance`, the `update_balances_in_op` call for
  synthetic fold entries, the `is_ec` branch, the `entries: Entries`
  field on `AccountSets`) is deleted.
- `Balances::find_balances_for_update`,
  `Balances::update_balance_for_account_in_op` and
  `BalanceRepo::load_all_for_update` had no other callers and are
  removed.
- The pre-existing `eventually_consistent_member_add_with_existing_balance`
  test asserted the now-removed fold-on-add behaviour and is deleted;
  the `balances` test is rewritten to wire its hierarchy up before any
  posts. Two new regression tests assert that `add_member` and
  `remove_member` return `MemberHasBalanceHistory` against any target
  whose candidate member has any row in `cala_balance_history`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@HonestMajority HonestMajority force-pushed the task/cala-ec-phase1-019d4863 branch from d4e4098 to 262a20b Compare April 9, 2026 09:53
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 262a20b. Configure here.

HonestMajority and others added 3 commits April 9, 2026 12:47
The recalc API has always been documented as a "building block for
periodic reconciliation jobs that need to batch-recalculate balances
for EC account sets" (see the doc-comment on
`list_eventually_consistent_ids`), but the entry points themselves
silently accepted any account set, including non-EC ones. Calling
`recalculate_balances` on a non-EC set would either be a no-op (best
case) or, if the bulk-insert nextval ordering happened to put the
ancestor's row before the leaf's row in the same batch, leave the
ancestor's `latest_seq < leaf's seq`, causing the next recalc to
re-fold the leaf's contribution and double-count.

Rather than fix the underlying within-batch ordering issue (which
would require a behaviour change to `insert_new_snapshots`), enforce
the documented contract at the API surface:

- New `AccountSetError::CannotRecalculateNonEcSet { account_set_id }`.
- `recalculate_balances_batch_in_op` (the lowest public entry point
  that all of `recalculate_balances` / `_batch` / `_deep` funnel
  through) now also fetches each set's underlying `Account` and
  rejects any input whose `eventually_consistent` flag is `false`.
- `find_all_descendant_set_ids` is renamed to
  `find_all_ec_descendant_set_ids` and JOINs `cala_accounts` to
  filter the recursive walk to EC descendants only. So calling
  `recalculate_balances_deep` on an EC root with a mixed hierarchy
  silently skips the non-EC descendants instead of erroring on them
  — the deep walk only ever surfaces sets the recalc can actually
  process.

Two regression tests:

- `recalculate_balances_errors_on_non_ec_set` asserts that both
  `recalculate_balances` and `recalculate_balances_batch` return
  the new error against a non-EC set.
- `recalculate_balances_deep_skips_non_ec_descendants` builds an
  EC root with one EC and one non-EC child, posts to a leaf in each
  subtree, and asserts that a deep recalc folds the EC subtree into
  the EC root (transitively, via both children's leaves) while
  leaving the non-EC child's balance and version untouched.

Reported by Cursor Bugbot in
#697 (review comment).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
`Balances::member_has_balance_history_in_op` (the history-count check
that `add_member_in_op` / `remove_member_in_op` run before mutating
membership) was taking an *exclusive* advisory lock on both the
parent set account and the member account. The parent EXCL turned
out to be the load-bearing edge in a three-way deadlock cycle
observed in a lana-bank 1000-loan stress workflow:

- Tx_A (a multi-call `add_member` transaction) holds EXCL on one
  hot parent set from an earlier call and wants EXCL on another hot
  parent set in a later call, where "hot" means "a parent set every
  concurrent activation touches" and the application picks them in
  application order, not canonical-UUID order.
- Tx_C (a poster running `find_for_update`) holds SHARED on the
  second parent set and wants SHARED on the first, because a leaf
  it's writing is a member of both.
- Tx_B (another `add_member` transaction waiting on the global
  `ADDVISORY_LOCK_ID` CTE serializer) participates as the third
  vertex, giving PostgreSQL a deadlock-detector triangle to kill.

The fix is to recognise that the parent lock in this check only
needs to serialize with a concurrent *recalc* on the same parent
(EXCL/SHARED is incompatible), not with concurrent *posters*. So
take SHARED on the parent and keep EXCLUSIVE on the member. The
member EXCL is what actually makes the `EXISTS` stable — an
in-flight poster on the member holds SHARED on it via
`find_for_update` and blocks against our EXCL, so committed state
is visible by the time the existence check runs. SHARED on the
parent is still incompatible with recalc's EXCL on the same parent,
so the add-vs-recalc serialization is preserved, but SHARED/SHARED
between add_member and poster is compatible, which breaks the
cycle.

Implementation:

- The lock prelude and the `EXISTS` check collapse into a single
  `BalanceRepo::member_has_balance_history_in_op` method that
  issues two SQL statements back-to-back: a canonically-ordered
  lock query using `CASE WHEN v.account_id = $member THEN EXCL
  ELSE SHARED END`, then the existing `EXISTS` query against
  `cala_balance_history`. The Balances wrapper in
  `balance/mod.rs` is now a thin delegate.
- The separate public `account_has_any_balance_history_in_op`
  repo method is gone — it only existed as the non-locking half
  and had no other callers.

Regression test: `tests/ec_recalc_race.rs` gains
`add_member_multi_call_no_deadlock_with_posters`, which reproduces
the exact shape from the report — several tokio tasks each opening
a transaction that does several `add_member_in_op` calls against a
shared pool of EC parent sets in *randomised* (non-canonical) order,
interleaved with poster tasks posting to a single hot leaf that is
a direct member of every parent (so each poster's `find_for_update`
takes SHARED on every parent at once). Verified locally: 10/10
reliably deadlock when the parent lock is restored to EXCLUSIVE,
10/10 reliably pass with the fix.

Also adds `err(level = "warn")` to the `#[instrument]` attributes
on `update_balances_in_op`, `recalculate_account_set_balances_batch_in_op`,
`member_has_balance_history_in_op`, `lock_accounts_exclusive_in_op`,
`add_member_in_op`, `remove_member_in_op`, and the four
`{add,remove}_member_{account,set}_and_return_parents` repo
functions. The original report noted that the deadlock errors were
not surfacing on OTel spans, because `#[instrument]` without
`err(...)` leaves span status unset on error return; adding the
attribute makes this class of problem visible in production traces
without having to read PG logs.

Reported by lana-bank `task/lana-ec-impl-019d4a85` hitting the
1000-loan admin-cli generator against cala `262a20be`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…order

`BalanceRepo::lock_accounts_exclusive_in_op` took a
`HashSet<AccountId>`, collected it into a `Vec<AccountId>` in
HashSet iteration order (per-process-hashed, nondeterministic),
and relied on an SQL-level `ORDER BY account_id` at the tail of the
lock query to get canonical acquisition order.

That reliance is shaky: PG is free to evaluate the per-row
projection — the `pg_advisory_xact_lock` call in the SELECT list —
before the sort node, in which case the locks fire in the Vec's
arbitrary input order. For single-set recalcs this is fine (one
lock, no order matters), but two concurrent
`recalculate_balances_batch_in_op` calls on overlapping sets could
acquire the shared EXCL locks in opposite orders and deadlock.

Fix by sorting at the Rust level after collecting the HashSet,
mirroring the BTreeSet discipline the poster path already uses in
`Balances::update_balances_in_op`. With a sorted input array,
UNNEST emits rows in sorted order, `pg_advisory_xact_lock` fires in
sorted order, and the SQL `ORDER BY` becomes redundant — dropped
as cleanup, because leaving it behind implies the SQL is what
enforces order (it isn't).

No behaviour change for any existing non-overlapping caller, and
no test failure mode that was previously reachable — closing a
latent fragility before a future caller triggers it.

Reported in a PR #697 review comment ("Fix lock ordering in
lock_accounts_exclusive_in_op (Finding #2)").

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants