Skip to content

Add support for AWS Kinesis Enhanced Fan-Out as an input#680

Open
matus-tomlein wants to merge 21 commits intowarpstreamlabs:mainfrom
snowplow-incubator:efo-support
Open

Add support for AWS Kinesis Enhanced Fan-Out as an input#680
matus-tomlein wants to merge 21 commits intowarpstreamlabs:mainfrom
snowplow-incubator:efo-support

Conversation

@matus-tomlein
Copy link

@matus-tomlein matus-tomlein commented Jan 30, 2026

Reopening PR #669 from @jbeemster

This pull request introduces a new consumption mode for Kinesis with Enhanced FanOut support. What this lets us do is provide a dedicated consumption pipe for Bento instead of depending on shared polling.

As much as possible we have tried to re-use the existing Kinesis input architectures and have only added EFO in terms of consuming the events but checkpointing + batching have been left identical. Quite a lot of the logical patterns have also been copied from the polling implementation and then altered to support EFO.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds support for AWS Kinesis Enhanced Fan-Out (EFO) as a new consumption mode for the aws_kinesis input. EFO provides dedicated throughput (2 MB/sec per consumer per shard) and lower latency (~70ms) compared to the existing polling-based consumption.

Changes:

  • Added new EFO consumption mode with push-based streaming via SubscribeToShard API
  • Optimized DynamoDB checkpoint queries from Scan to Query for better performance
  • Enhanced shard balancing logic to handle finished shards with remaining checkpoints
  • Added configuration options for EFO consumer registration and tuning

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
website/docs/components/inputs/aws_kinesis.md Documentation for new enhanced_fan_out configuration options
internal/impl/aws/input_kinesis.go Added EFO config parsing, validation, and integration with existing consumer logic
internal/impl/aws/input_kinesis_efo.go New file implementing EFO consumer registration, subscription management, and streaming
internal/impl/aws/input_kinesis_checkpointer.go Refactored from Scan to Query for better performance; added GetCheckpointsAndClaims method
internal/impl/aws/input_kinesis_test.go Added unit tests for isShardFinished helper function

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +27 to +49
// newKinesisEFOManager creates a new EFO manager
func newKinesisEFOManager(conf *kiEFOConfig, streamARN, clientID string, svc *kinesis.Client, log *service.Logger) (*kinesisEFOManager, error) {
if conf == nil {
return nil, errors.New("enhanced fan out config is nil")
}

if conf.ConsumerName != "" && conf.ConsumerARN != "" {
return nil, errors.New("cannot specify both consumer_name and consumer_arn")
}

consumerName := conf.ConsumerName
if consumerName == "" && conf.ConsumerARN == "" {
consumerName = "bento-" + clientID
}

return &kinesisEFOManager{
streamARN: streamARN,
consumerName: consumerName,
consumerARN: conf.ConsumerARN,
svc: svc,
log: log,
}, nil
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EFO consumer registration is never cleaned up when the input is closed. While AWS will eventually clean up inactive consumers automatically, it's better practice to explicitly deregister consumers during shutdown to avoid leaving orphaned consumer registrations. Consider adding a cleanup method to kinesisEFOManager that calls DeregisterStreamConsumer, and invoke it from the Close method of kinesisReader. Note that this should only deregister consumers that were registered (not ones that used a provided consumer_arn).

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer de registration needs to be done very carefully. Consider the case where you have multiple pods in an auto-scaling architecture - if you scale-down do you want to issue a de-registration call and break your other pod that is still actively consuming?

For this reason (and similar to the DynamoDB table) - automated creation is fine and is a good helper. But destruction should be controlled by the user or an external process (internally we manage the consumer via Terraform before deploying the pods).

Comment on lines 230 to 241
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error channel has a buffer of 1, but the subscription goroutine can potentially send multiple errors if the subscription fails repeatedly before the main loop processes the error. This could lead to the subscription goroutine blocking when trying to send an error while the main loop is busy processing records or performing other operations. Consider either:

  1. Making the errorsChan unbuffered and using a non-blocking send with a default case
  2. Increasing the buffer size to match other channels (like resubscribeChan)
  3. Using a select with default case to avoid blocking

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After analysis, I believe the current implementation is correct.

The subscription goroutine follows this pattern:

  1. Call efoSubscribeAndStream()
  2. If error, send to errorsChan (blocking)
  3. Loop back to for sequence := range subscriptionTrigger and wait for the next sequence

The goroutine cannot send multiple errors in rapid succession because after sending one error, it blocks waiting for a new sequence from subscriptionTrigger. A new sequence is only sent after the main loop processes the error and schedules a resubscription via time.AfterFunc.

The blocking send on errorsChan is intentional - it provides backpressure so the subscription goroutine waits until the error is processed before proceeding. This ensures errors are not lost and are handled in order.

With the buffer of 1, the flow is:

  1. Error occurs → sent to buffer (non-blocking if buffer empty)
  2. Goroutine waits for next sequence
  3. Main loop processes error, schedules resubscription
  4. New sequence sent → goroutine proceeds
  5. Next error (if any) can use the now-empty buffer

The buffer of 1 is sufficient here and prevents unnecessary blocking on the happy path while maintaining correct error handling semantics.

Comment on lines +415 to +417
// Continue from last sequence
startingPosition = &types.StartingPosition{
Type: types.ShardIteratorTypeAfterSequenceNumber,
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to AWS Kinesis Enhanced Fan-Out documentation, when using a ContinuationSequenceNumber from a previous SubscribeToShard response, it should be paired with ShardIteratorType AT_SEQUENCE_NUMBER, not AFTER_SEQUENCE_NUMBER. The current implementation always uses AFTER_SEQUENCE_NUMBER for non-empty sequences (line 417), which means when resubscribing with a continuation sequence, the first record at that sequence number will be skipped. This could lead to data loss during normal resubscriptions. The code should distinguish between continuation sequences (use AT_SEQUENCE_NUMBER) and checkpointed sequences (use AFTER_SEQUENCE_NUMBER).

Suggested change
// Continue from last sequence
startingPosition = &types.StartingPosition{
Type: types.ShardIteratorTypeAfterSequenceNumber,
// Continue from last sequence (use AT_SEQUENCE_NUMBER to avoid skipping the record)
startingPosition = &types.StartingPosition{
Type: types.ShardIteratorTypeAtSequenceNumber,

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After investigation, the current implementation using AFTER_SEQUENCE_NUMBER appears to be correct.

The official AWS KCL (Kinesis Client Library) uses AFTER_SEQUENCE_NUMBER when resubscribing with a ContinuationSequenceNumber. From IteratorBuilder.java:

public static StartingPosition.Builder reconnectRequest(...) {
  return apply(..., ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}

According to the AWS documentation, ContinuationSequenceNumber "captures your shard progress even when no data is written to the shard" - meaning it represents the last position processed, not the next record to read. Using AFTER_SEQUENCE_NUMBER ensures we start reading after that position, avoiding re-processing.

Using AT_SEQUENCE_NUMBER would cause the record at ContinuationSequenceNumber to be re-delivered, potentially causing duplicates.

@jem-davies jem-davies self-assigned this Jan 30, 2026
Fields with Default() values don't need Optional() as well.
This aligns with how other config fields in the codebase are marked.
The validation for conflicting consumer_name and consumer_arn is already
performed in newKinesisEFOManager and via LintRule, making this check
redundant.
Capture sequence before the time.AfterFunc closure to avoid accessing
recordBatcher after the consumer has cleaned up. Also add context
cancellation check to prevent attempting resubscription during shutdown.
Check ctx.Done() after returning from efoSubscribeAndStream and before
attempting to send on any channels. This prevents goroutine leaks when
the context is cancelled during shutdown.
When ContinuationSequenceNumber is not provided, fall back to the last
received record's sequence number instead of the last acknowledged
sequence. This avoids re-delivering in-flight records that have been
received but not yet acknowledged.
@jem-davies
Copy link
Collaborator

@matus-tomlein - could you mark the PR as draft if changes are still happening? Also does this PR (#680) include #644?

Could you add 644 to this PR?

@matus-tomlein matus-tomlein marked this pull request as draft February 3, 2026 13:47
@matus-tomlein
Copy link
Author

hey @jem-davies, sorry, we just came across an issue with some limited throughput due to the back-pressure mechanism when running this under load, so doing some small changes. Have marked it as draft and will let you know once we figure it out.

Yes, #644 is included in this PR.

@jem-davies
Copy link
Collaborator

hey @jem-davies, sorry, we just came across an issue with some limited throughput due to the back-pressure mechanism when running this under load, so doing some small changes. Have marked it as draft and will let you know once we figure it out.

Yes, #644 is included in this PR.

Ok great - still going to look at this - whilst you do that - want to get this in for the next release - thanks for the effort again 🙏

@jem-davies jem-davies removed their assignment Feb 10, 2026
@jem-davies
Copy link
Collaborator

Would it be possible to do something like this for an int test?

jem-davies and others added 2 commits February 12, 2026 14:01
Signed-off-by: Jem Davies <jemsot@gmail.com>
…re robust EFO retry handling (#1)

* Update backpressure mechanism to build on the pending buffer

* Add global pending pool

* Remove default case to ensure resubscription triggers aren't dropped

* Add a timeout to WaitForSpace so that if backpressure takes too long, we proactively close and resubscribe rather than waiting for AWS to reset the connection

* fix: Add backoff for backpressure timeouts to prevent resubscription loops

When backpressure causes WaitForSpace to timeout, we were returning
nil error which triggered immediate resubscription. But since the
backpressure condition hasn't resolved, this creates a loop of:
Subscribe → Wait 30s → Timeout → Resubscribe immediately → Repeat

Now we return errBackpressureTimeout which is treated as a retryable
error, triggering exponential backoff before the next subscription
attempt. This gives the system time to drain pending records.

Also adds distinct debug logging for backpressure timeouts vs network
errors to aid in troubleshooting.

* fix: Prevent deadlock when error channel is full in EFO subscription

When multiple errors occur in rapid succession, the subscription
goroutine could block trying to send to errorsChan (buffer size 1).
This prevented it from receiving resubscription triggers, causing
a deadlock where:
1. Subscription goroutine blocks on errorsChan <- err
2. time.AfterFunc blocks on subscriptionTrigger <- sequence
3. Neither can proceed

Fix:
- Increased errorsChan buffer to 8
- Use non-blocking send to errorsChan
- If channel is full, handle retry locally with a small delay
  and re-queue on subscriptionTrigger

This ensures the subscription goroutine never blocks indefinitely
and can always accept resubscription triggers.

* fix: Make subscription goroutine self-sufficient for retry handling

Previously, retry coordination was split between the subscription
goroutine and the main loop:
1. Subscription goroutine sent errors to errorsChan
2. Main loop received errors and scheduled resubscription via time.AfterFunc
3. time.AfterFunc sent to subscriptionTrigger
4. Subscription goroutine received from subscriptionTrigger

This created multiple potential deadlock/starvation scenarios:
- If errorsChan was full, subscription goroutine could block
- If subscriptionTrigger was full, time.AfterFunc could block
- Complex coordination made it easy for resubscription triggers to be lost

New design makes subscription goroutine fully autonomous:
- Has its own backoff (300ms initial, 5s max, never stops)
- Retries directly in an inner loop until success
- Sends to errorsChan non-blocking (for logging only)
- Main loop just logs errors, doesn't trigger resubscription

This eliminates all coordination issues since the subscription
goroutine never waits for the main loop to tell it to retry.

* refactor: Clean up EFO subscription code after self-sufficient retry redesign

Simplifications after making subscription goroutine self-sufficient:

1. Remove unused `boff` (backoff) from main consumer loop
   - Was previously used for scheduling resubscription via time.AfterFunc
   - No longer needed since subscription goroutine has its own backoff

2. Reduce errorsChan buffer from 8 to 1
   - Only used for logging/monitoring now, not control flow
   - Non-blocking sends handle overflow gracefully

3. Fix non-retryable error handling in subscription goroutine
   - Now checks for ResourceNotFoundException and InvalidArgumentException
   - Stops retrying and notifies main loop for proper shutdown
   - Previously would have kept retrying forever

4. Update comments to reflect new design

* fix: Address PR review feedback for EFO global pending pool

High priority fixes:

1. Replace busy-polling with proper cond.Wait() signaling
   - Acquire and WaitForSpace now use sync.Cond.Wait() instead of
     time.After(10ms) polling loop
   - Eliminates unnecessary CPU/GC overhead under sustained backpressure
   - Context cancellation and timeout handled via goroutine that
     broadcasts to wake up waiters

2. Handle count > max in Acquire
   - Return false immediately if requested count exceeds pool max
   - Prevents indefinite blocking when Kinesis returns a batch larger
     than max_pending_records

3. Use blocking send for non-retryable errors
   - Fatal errors (ResourceNotFoundException, InvalidArgumentException)
     now use blocking send to errorsChan with context guard
   - Ensures main loop always receives fatal errors for proper shutdown
   - Prevents shard from getting stuck without active subscription

Medium priority fixes:

4. Drain recordsChan on shutdown
   - Added drainRecordsChan() helper called after subscriptionWg.Wait()
   - Releases pool capacity for any records buffered in recordsChan
   - Prevents leaking pool capacity when consumer exits with buffered records

5. Add unit tests for globalPendingPool
   - Tests for Acquire/Release basic operations
   - Tests for Acquire blocking until space available
   - Tests for Acquire with count > max (immediate failure)
   - Tests for context cancellation
   - Tests for WaitForSpace with timeout
   - Tests for concurrent access
@matus-tomlein matus-tomlein marked this pull request as ready for review February 12, 2026 13:23
@matus-tomlein
Copy link
Author

hi @jem-davies, we have done some more load testing of the EFO implementation to work out some issues. in particular we have done the changes mentioned in this PR around introducing a new global pending pool and a self-sufficient error handling: snowplow-incubator#1

This PR should now be ready for review! We don't have any other issues that we are working on now, based on our testing it seems to be working well (we are currently running it on a couple of deployments).

@jem-davies
Copy link
Collaborator

jem-davies commented Feb 16, 2026

@matus-tomlein This PR should fix the CI: snowplow-incubator#3 - keen to get this in!

Signed-off-by: Jem Davies <jemsot@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments