Add support for AWS Kinesis Enhanced Fan-Out as an input#680
Add support for AWS Kinesis Enhanced Fan-Out as an input#680matus-tomlein wants to merge 21 commits intowarpstreamlabs:mainfrom
Conversation
…namodb and use pagination rather than the scan query
There was a problem hiding this comment.
Pull request overview
This pull request adds support for AWS Kinesis Enhanced Fan-Out (EFO) as a new consumption mode for the aws_kinesis input. EFO provides dedicated throughput (2 MB/sec per consumer per shard) and lower latency (~70ms) compared to the existing polling-based consumption.
Changes:
- Added new EFO consumption mode with push-based streaming via SubscribeToShard API
- Optimized DynamoDB checkpoint queries from Scan to Query for better performance
- Enhanced shard balancing logic to handle finished shards with remaining checkpoints
- Added configuration options for EFO consumer registration and tuning
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| website/docs/components/inputs/aws_kinesis.md | Documentation for new enhanced_fan_out configuration options |
| internal/impl/aws/input_kinesis.go | Added EFO config parsing, validation, and integration with existing consumer logic |
| internal/impl/aws/input_kinesis_efo.go | New file implementing EFO consumer registration, subscription management, and streaming |
| internal/impl/aws/input_kinesis_checkpointer.go | Refactored from Scan to Query for better performance; added GetCheckpointsAndClaims method |
| internal/impl/aws/input_kinesis_test.go | Added unit tests for isShardFinished helper function |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // newKinesisEFOManager creates a new EFO manager | ||
| func newKinesisEFOManager(conf *kiEFOConfig, streamARN, clientID string, svc *kinesis.Client, log *service.Logger) (*kinesisEFOManager, error) { | ||
| if conf == nil { | ||
| return nil, errors.New("enhanced fan out config is nil") | ||
| } | ||
|
|
||
| if conf.ConsumerName != "" && conf.ConsumerARN != "" { | ||
| return nil, errors.New("cannot specify both consumer_name and consumer_arn") | ||
| } | ||
|
|
||
| consumerName := conf.ConsumerName | ||
| if consumerName == "" && conf.ConsumerARN == "" { | ||
| consumerName = "bento-" + clientID | ||
| } | ||
|
|
||
| return &kinesisEFOManager{ | ||
| streamARN: streamARN, | ||
| consumerName: consumerName, | ||
| consumerARN: conf.ConsumerARN, | ||
| svc: svc, | ||
| log: log, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
The EFO consumer registration is never cleaned up when the input is closed. While AWS will eventually clean up inactive consumers automatically, it's better practice to explicitly deregister consumers during shutdown to avoid leaving orphaned consumer registrations. Consider adding a cleanup method to kinesisEFOManager that calls DeregisterStreamConsumer, and invoke it from the Close method of kinesisReader. Note that this should only deregister consumers that were registered (not ones that used a provided consumer_arn).
There was a problem hiding this comment.
Consumer de registration needs to be done very carefully. Consider the case where you have multiple pods in an auto-scaling architecture - if you scale-down do you want to issue a de-registration call and break your other pod that is still actively consuming?
For this reason (and similar to the DynamoDB table) - automated creation is fine and is a good helper. But destruction should be controlled by the user or an external process (internally we manage the consumer via Terraform before deploying the pods).
There was a problem hiding this comment.
The error channel has a buffer of 1, but the subscription goroutine can potentially send multiple errors if the subscription fails repeatedly before the main loop processes the error. This could lead to the subscription goroutine blocking when trying to send an error while the main loop is busy processing records or performing other operations. Consider either:
- Making the errorsChan unbuffered and using a non-blocking send with a default case
- Increasing the buffer size to match other channels (like resubscribeChan)
- Using a select with default case to avoid blocking
There was a problem hiding this comment.
After analysis, I believe the current implementation is correct.
The subscription goroutine follows this pattern:
- Call
efoSubscribeAndStream() - If error, send to
errorsChan(blocking) - Loop back to
for sequence := range subscriptionTriggerand wait for the next sequence
The goroutine cannot send multiple errors in rapid succession because after sending one error, it blocks waiting for a new sequence from subscriptionTrigger. A new sequence is only sent after the main loop processes the error and schedules a resubscription via time.AfterFunc.
The blocking send on errorsChan is intentional - it provides backpressure so the subscription goroutine waits until the error is processed before proceeding. This ensures errors are not lost and are handled in order.
With the buffer of 1, the flow is:
- Error occurs → sent to buffer (non-blocking if buffer empty)
- Goroutine waits for next sequence
- Main loop processes error, schedules resubscription
- New sequence sent → goroutine proceeds
- Next error (if any) can use the now-empty buffer
The buffer of 1 is sufficient here and prevents unnecessary blocking on the happy path while maintaining correct error handling semantics.
| // Continue from last sequence | ||
| startingPosition = &types.StartingPosition{ | ||
| Type: types.ShardIteratorTypeAfterSequenceNumber, |
There was a problem hiding this comment.
According to AWS Kinesis Enhanced Fan-Out documentation, when using a ContinuationSequenceNumber from a previous SubscribeToShard response, it should be paired with ShardIteratorType AT_SEQUENCE_NUMBER, not AFTER_SEQUENCE_NUMBER. The current implementation always uses AFTER_SEQUENCE_NUMBER for non-empty sequences (line 417), which means when resubscribing with a continuation sequence, the first record at that sequence number will be skipped. This could lead to data loss during normal resubscriptions. The code should distinguish between continuation sequences (use AT_SEQUENCE_NUMBER) and checkpointed sequences (use AFTER_SEQUENCE_NUMBER).
| // Continue from last sequence | |
| startingPosition = &types.StartingPosition{ | |
| Type: types.ShardIteratorTypeAfterSequenceNumber, | |
| // Continue from last sequence (use AT_SEQUENCE_NUMBER to avoid skipping the record) | |
| startingPosition = &types.StartingPosition{ | |
| Type: types.ShardIteratorTypeAtSequenceNumber, |
There was a problem hiding this comment.
After investigation, the current implementation using AFTER_SEQUENCE_NUMBER appears to be correct.
The official AWS KCL (Kinesis Client Library) uses AFTER_SEQUENCE_NUMBER when resubscribing with a ContinuationSequenceNumber. From IteratorBuilder.java:
public static StartingPosition.Builder reconnectRequest(...) {
return apply(..., ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}According to the AWS documentation, ContinuationSequenceNumber "captures your shard progress even when no data is written to the shard" - meaning it represents the last position processed, not the next record to read. Using AFTER_SEQUENCE_NUMBER ensures we start reading after that position, avoiding re-processing.
Using AT_SEQUENCE_NUMBER would cause the record at ContinuationSequenceNumber to be re-delivered, potentially causing duplicates.
…d consumer_arn are specified
Fields with Default() values don't need Optional() as well. This aligns with how other config fields in the codebase are marked.
The validation for conflicting consumer_name and consumer_arn is already performed in newKinesisEFOManager and via LintRule, making this check redundant.
Capture sequence before the time.AfterFunc closure to avoid accessing recordBatcher after the consumer has cleaned up. Also add context cancellation check to prevent attempting resubscription during shutdown.
Check ctx.Done() after returning from efoSubscribeAndStream and before attempting to send on any channels. This prevents goroutine leaks when the context is cancelled during shutdown.
90b6dd7 to
1dd27da
Compare
When ContinuationSequenceNumber is not provided, fall back to the last received record's sequence number instead of the last acknowledged sequence. This avoids re-delivering in-flight records that have been received but not yet acknowledged.
|
@matus-tomlein - could you mark the PR as draft if changes are still happening? Also does this PR (#680) include #644? Could you add 644 to this PR? |
|
hey @jem-davies, sorry, we just came across an issue with some limited throughput due to the back-pressure mechanism when running this under load, so doing some small changes. Have marked it as draft and will let you know once we figure it out. Yes, #644 is included in this PR. |
Ok great - still going to look at this - whilst you do that - want to get this in for the next release - thanks for the effort again 🙏 |
6c69cba to
f391410
Compare
|
Would it be possible to do something like this for an int test? |
Signed-off-by: Jem Davies <jemsot@gmail.com>
…re robust EFO retry handling (#1) * Update backpressure mechanism to build on the pending buffer * Add global pending pool * Remove default case to ensure resubscription triggers aren't dropped * Add a timeout to WaitForSpace so that if backpressure takes too long, we proactively close and resubscribe rather than waiting for AWS to reset the connection * fix: Add backoff for backpressure timeouts to prevent resubscription loops When backpressure causes WaitForSpace to timeout, we were returning nil error which triggered immediate resubscription. But since the backpressure condition hasn't resolved, this creates a loop of: Subscribe → Wait 30s → Timeout → Resubscribe immediately → Repeat Now we return errBackpressureTimeout which is treated as a retryable error, triggering exponential backoff before the next subscription attempt. This gives the system time to drain pending records. Also adds distinct debug logging for backpressure timeouts vs network errors to aid in troubleshooting. * fix: Prevent deadlock when error channel is full in EFO subscription When multiple errors occur in rapid succession, the subscription goroutine could block trying to send to errorsChan (buffer size 1). This prevented it from receiving resubscription triggers, causing a deadlock where: 1. Subscription goroutine blocks on errorsChan <- err 2. time.AfterFunc blocks on subscriptionTrigger <- sequence 3. Neither can proceed Fix: - Increased errorsChan buffer to 8 - Use non-blocking send to errorsChan - If channel is full, handle retry locally with a small delay and re-queue on subscriptionTrigger This ensures the subscription goroutine never blocks indefinitely and can always accept resubscription triggers. * fix: Make subscription goroutine self-sufficient for retry handling Previously, retry coordination was split between the subscription goroutine and the main loop: 1. Subscription goroutine sent errors to errorsChan 2. Main loop received errors and scheduled resubscription via time.AfterFunc 3. time.AfterFunc sent to subscriptionTrigger 4. Subscription goroutine received from subscriptionTrigger This created multiple potential deadlock/starvation scenarios: - If errorsChan was full, subscription goroutine could block - If subscriptionTrigger was full, time.AfterFunc could block - Complex coordination made it easy for resubscription triggers to be lost New design makes subscription goroutine fully autonomous: - Has its own backoff (300ms initial, 5s max, never stops) - Retries directly in an inner loop until success - Sends to errorsChan non-blocking (for logging only) - Main loop just logs errors, doesn't trigger resubscription This eliminates all coordination issues since the subscription goroutine never waits for the main loop to tell it to retry. * refactor: Clean up EFO subscription code after self-sufficient retry redesign Simplifications after making subscription goroutine self-sufficient: 1. Remove unused `boff` (backoff) from main consumer loop - Was previously used for scheduling resubscription via time.AfterFunc - No longer needed since subscription goroutine has its own backoff 2. Reduce errorsChan buffer from 8 to 1 - Only used for logging/monitoring now, not control flow - Non-blocking sends handle overflow gracefully 3. Fix non-retryable error handling in subscription goroutine - Now checks for ResourceNotFoundException and InvalidArgumentException - Stops retrying and notifies main loop for proper shutdown - Previously would have kept retrying forever 4. Update comments to reflect new design * fix: Address PR review feedback for EFO global pending pool High priority fixes: 1. Replace busy-polling with proper cond.Wait() signaling - Acquire and WaitForSpace now use sync.Cond.Wait() instead of time.After(10ms) polling loop - Eliminates unnecessary CPU/GC overhead under sustained backpressure - Context cancellation and timeout handled via goroutine that broadcasts to wake up waiters 2. Handle count > max in Acquire - Return false immediately if requested count exceeds pool max - Prevents indefinite blocking when Kinesis returns a batch larger than max_pending_records 3. Use blocking send for non-retryable errors - Fatal errors (ResourceNotFoundException, InvalidArgumentException) now use blocking send to errorsChan with context guard - Ensures main loop always receives fatal errors for proper shutdown - Prevents shard from getting stuck without active subscription Medium priority fixes: 4. Drain recordsChan on shutdown - Added drainRecordsChan() helper called after subscriptionWg.Wait() - Releases pool capacity for any records buffered in recordsChan - Prevents leaking pool capacity when consumer exits with buffered records 5. Add unit tests for globalPendingPool - Tests for Acquire/Release basic operations - Tests for Acquire blocking until space available - Tests for Acquire with count > max (immediate failure) - Tests for context cancellation - Tests for WaitForSpace with timeout - Tests for concurrent access
|
hi @jem-davies, we have done some more load testing of the EFO implementation to work out some issues. in particular we have done the changes mentioned in this PR around introducing a new global pending pool and a self-sufficient error handling: snowplow-incubator#1 This PR should now be ready for review! We don't have any other issues that we are working on now, based on our testing it seems to be working well (we are currently running it on a couple of deployments). |
|
@matus-tomlein This PR should fix the CI: snowplow-incubator#3 - keen to get this in! |
Signed-off-by: Jem Davies <jemsot@gmail.com>
Reopening PR #669 from @jbeemster
This pull request introduces a new consumption mode for Kinesis with Enhanced FanOut support. What this lets us do is provide a dedicated consumption pipe for Bento instead of depending on shared polling.
As much as possible we have tried to re-use the existing Kinesis input architectures and have only added EFO in terms of consuming the events but checkpointing + batching have been left identical. Quite a lot of the logical patterns have also been copied from the polling implementation and then altered to support EFO.