Skip to content

Conversation

@jshearer
Copy link
Contributor

@jshearer jshearer commented Jan 20, 2026

This PR adds test coverage for Dekaf's handling of cross-dataplane migrations.

Previously, the convention was that Dekaf addresses live at dekaf.{plane_fqdn}:9092. In order to test migration locally, I had to manually and temporarily change this logic. In order to support automated testing of migrations, we needed a first-class way to represent the address of a data-plane's Dekaf instance. We already have reactor_address and broker_address in the data_planes table, so I added dekaf_address and dekaf_registry_address.

One wrinkle is that unlike reactors and brokers, data planes do not always run Dekaf. In order to handle this properly, I updated data-plane-controller to actively manage the dekaf_address and dekaf_registry_address columns, such that they're only set if Dekaf is actually deployed in that plane, otherwise they're null. Dekaf will now emit a helpful error message when attempting to serve a redirect to a data-plane that does not have an instance deployed.

Note: As it turns out, our mapping of redirects onto the Kafka protocol isn't perfect. Specifically, since we reuse the same broker ID for the redirect target:

// If the session needs to be redirected, this causes the consumer to
// connect to the correct Dekaf instance by advertising it as the
// only broker in the response. Otherwise advertise ourselves as the broker.
let broker = if let Some((broker_host, broker_port)) = self.get_redirect_address().await? {
MetadataResponseBroker::default()
.with_node_id(messages::BrokerId(1))
.with_host(StrBytes::from_string(broker_host))
.with_port(broker_port)
} else {
MetadataResponseBroker::default()
.with_node_id(messages::BrokerId(1))
.with_host(StrBytes::from_string(self.app.advertise_host.clone()))
.with_port(self.app.advertise_kafka_port as i32)
};

librdkafka gets confused and shuts down its session rather than smoothly connecting to the newly advertised broker. The subsequent connection only sees the new broker and connects to it properly, but I believe this is the source of the periodic error logging we've been seeing. Fixing it is out of scope here, but it involves a mechanism for assigning each data-plane's Dekaf instance(s) a stable numeric identifier that we can use as the broker ID.

Note: I discovered a race condition in Gazette while working on these tests that causes them to have an unfortunately high upper bound on runtime: about 8 minutes.

Briefly, when a journal is migrated, one of the steps is suspending it in the source data-plane. When a journal is suspended, two things happen: its un-persisted fragments are scheduled for immediate upload, and its replication factor is reduced from 3 to 1. The race condition is: if the broker that was the primary for the journal being suspended ends up being picked as the sole remaining broker in the topology, the fragment persistence happens promptly. Otherwise, it falls back to the backup fragment upload pathway which takes up to 3 minutes.

If all of the necessary fragments aren't present when the journal is then published to the destination data-plane, we have to wait out the 5 minute default fragment refresh interval before they get picked up. Hence the 8 minute upper bound on runtime.

The following output is with a temporarily modified faster fragment refresh interval:

 Nextest run ID db351aec-39d7-4872-a37e-e4ef30c2f1fb with nextest profile: dekaf-e2e
    Starting 2 tests across 1 binary (14 tests and 3 binaries skipped)
        SLOW [> 60.000s] dekaf::e2e migration::test_rdkafka_handles_redirect
        SLOW [> 60.000s] dekaf::e2e migration::test_migration_protocol_responses
        SLOW [>120.000s] dekaf::e2e migration::test_migration_protocol_responses
        SLOW [>120.000s] dekaf::e2e migration::test_rdkafka_handles_redirect
        PASS [ 170.702s] dekaf::e2e migration::test_migration_protocol_responses
        SLOW [>180.000s] dekaf::e2e migration::test_rdkafka_handles_redirect
        PASS [ 187.367s] dekaf::e2e migration::test_rdkafka_handles_redirect

@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch 6 times, most recently from 585fc03 to 1fd7e66 Compare January 21, 2026 16:15
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch from 7823aab to 109edbf Compare January 21, 2026 16:15
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch from 1fd7e66 to cc80924 Compare January 21, 2026 16:58
@jshearer jshearer changed the base branch from dekaf/collection_reset_with_e2e_tests to master January 21, 2026 17:23
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch 3 times, most recently from bd16cf6 to 9c4b3c0 Compare January 22, 2026 20:32
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch from 9c4b3c0 to fe0e7a8 Compare January 26, 2026 18:47
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch from fe0e7a8 to 72ff5e2 Compare January 27, 2026 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants