Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 118 additions & 5 deletions internal/impl/aws/input_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,22 @@ const (
kiFieldRebalancePeriod = "rebalance_period"
kiFieldStartFromOldest = "start_from_oldest"
kiFieldBatching = "batching"
kiFieldEnhancedFanOut = "enhanced_fan_out"

// Enhanced Fan Out Fields
kiEFOFieldEnabled = "enabled"
kiEFOFieldConsumerName = "consumer_name"
kiEFOFieldConsumerARN = "consumer_arn"
kiEFOFieldRecordBufferCap = "record_buffer_cap"
)

type kiEFOConfig struct {
Enabled bool
ConsumerName string
ConsumerARN string
RecordBufferCap int
}

type kiConfig struct {
Streams []string
DynamoDB kiddbConfig
Expand All @@ -47,6 +61,7 @@ type kiConfig struct {
LeasePeriod string
RebalancePeriod string
StartFromOldest bool
EnhancedFanOut *kiEFOConfig
}

func kinesisInputConfigFromParsed(pConf *service.ParsedConfig) (conf kiConfig, err error) {
Expand All @@ -73,6 +88,27 @@ func kinesisInputConfigFromParsed(pConf *service.ParsedConfig) (conf kiConfig, e
if conf.StartFromOldest, err = pConf.FieldBool(kiFieldStartFromOldest); err != nil {
return
}
if pConf.Contains(kiFieldEnhancedFanOut) {
efoConf := &kiEFOConfig{}
efoNs := pConf.Namespace(kiFieldEnhancedFanOut)
if efoConf.Enabled, err = efoNs.FieldBool(kiEFOFieldEnabled); err != nil {
return
}
if efoConf.ConsumerName, err = efoNs.FieldString(kiEFOFieldConsumerName); err != nil {
return
}
if efoConf.ConsumerARN, err = efoNs.FieldString(kiEFOFieldConsumerARN); err != nil {
return
}
if efoConf.RecordBufferCap, err = efoNs.FieldInt(kiEFOFieldRecordBufferCap); err != nil {
return
}
if efoConf.RecordBufferCap < 0 {
err = errors.New("enhanced_fan_out.record_buffer_cap must be at least 0")
return
}
conf.EnhancedFanOut = efoConf
}
return
}

Expand Down Expand Up @@ -141,6 +177,27 @@ Use the `+"`batching`"+` fields to configure an optional [batching policy](/docs
service.NewBoolField(kiFieldStartFromOldest).
Description("Whether to consume from the oldest message when a sequence does not yet exist for the stream.").
Default(true),
service.NewObjectField(kiFieldEnhancedFanOut,
service.NewBoolField(kiEFOFieldEnabled).
Description("Enable Enhanced Fan Out mode for push-based streaming with dedicated throughput.").
Default(false),
service.NewStringField(kiEFOFieldConsumerName).
Description("Consumer name for EFO registration. Auto-generated if empty: bento-clientID.").
Default("").
Optional(),
service.NewStringField(kiEFOFieldConsumerARN).
Description("Existing consumer ARN to use. If provided, skips registration.").
Default("").
Optional().
Advanced(),
service.NewIntField(kiEFOFieldRecordBufferCap).
Description("Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 0 for unbuffered channel (minimal memory footprint).").
Default(0).
Advanced(),
).
Description("Enhanced Fan Out configuration for push-based streaming. Provides dedicated 2 MB/sec throughput per consumer per shard and lower latency (~70ms). Note: EFO incurs per shard-hour charges.").
Optional().
Advanced(),
).
Fields(config.SessionFields()...).
Field(service.NewBatchPolicyField(kiFieldBatching))
Expand Down Expand Up @@ -174,6 +231,7 @@ type streamInfo struct {
explicitShards []string
id string // Either a name or arn, extracted from config and used for balancing shards
arn string
efoManager *kinesisEFOManager // Enhanced Fan Out manager (if EFO is enabled)
}

type kinesisReader struct {
Expand All @@ -189,6 +247,7 @@ type kinesisReader struct {

svc *kinesis.Client
checkpointer *awsKinesisCheckpointer
efoEnabled bool

streams []*streamInfo

Expand Down Expand Up @@ -319,6 +378,18 @@ func newKinesisReaderFromConfig(conf kiConfig, batcher service.BatchPolicy, sess
if k.rebalancePeriod, err = time.ParseDuration(k.conf.RebalancePeriod); err != nil {
return nil, fmt.Errorf("failed to parse rebalance period string: %v", err)
}

// Check if Enhanced Fan Out is enabled
if k.conf.EnhancedFanOut != nil && k.conf.EnhancedFanOut.Enabled {
k.efoEnabled = true
k.log.Infof("Enhanced Fan Out enabled")

// Validate EFO configuration
if k.conf.EnhancedFanOut.ConsumerName != "" && k.conf.EnhancedFanOut.ConsumerARN != "" {
return nil, errors.New("cannot specify both consumer_name and consumer_arn in enhanced_fan_out config")
}
Comment on lines +388 to +390
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a LintRule for this? It will catch misconfiguration earlier if so.

}

return &k, nil
}

Expand Down Expand Up @@ -658,9 +729,13 @@ func (k *kinesisReader) runBalancedShards() {
for _, info := range k.streams {
allShards, err := collectShards(k.ctx, info.arn, k.svc)
var clientClaims map[string][]awsKinesisClientClaim
var shardsWithCheckpoints map[string]bool
if err == nil {
clientClaims, err = k.checkpointer.AllClaims(k.ctx, info.id)
}
if err == nil {
shardsWithCheckpoints, err = k.checkpointer.AllCheckpoints(k.ctx, info.id)
}
if err != nil {
if k.ctx.Err() != nil {
return
Expand All @@ -672,8 +747,12 @@ func (k *kinesisReader) runBalancedShards() {
totalShards := len(allShards)
unclaimedShards := make(map[string]string, totalShards)
for _, s := range allShards {
if !isShardFinished(s) {
unclaimedShards[*s.ShardId] = ""
// Include shard if:
// 1. It's not finished (still open), OR
// 2. It's finished but has a checkpoint (meaning it hasn't been fully consumed yet)
shardID := *s.ShardId
if !isShardFinished(s) || shardsWithCheckpoints[shardID] {
unclaimedShards[shardID] = ""
}
}
for clientID, claims := range clientClaims {
Expand All @@ -700,7 +779,12 @@ func (k *kinesisReader) runBalancedShards() {
continue
}
wg.Add(1)
if err = k.runConsumer(&wg, *info, shardID, sequence); err != nil {
if k.efoEnabled {
err = k.runEFOConsumer(&wg, *info, shardID, sequence)
} else {
err = k.runConsumer(&wg, *info, shardID, sequence)
}
if err != nil {
k.log.Errorf("Failed to start consumer: %v\n", err)
}
}
Expand Down Expand Up @@ -749,7 +833,12 @@ func (k *kinesisReader) runBalancedShards() {
info.id, randomShard, clientID, k.clientID,
)
wg.Add(1)
if err = k.runConsumer(&wg, *info, randomShard, sequence); err != nil {
if k.efoEnabled {
err = k.runEFOConsumer(&wg, *info, randomShard, sequence)
} else {
err = k.runConsumer(&wg, *info, randomShard, sequence)
}
if err != nil {
k.log.Errorf("Failed to start consumer: %v\n", err)
} else {
// If we successfully stole the shard then that's enough
Expand Down Expand Up @@ -790,7 +879,11 @@ func (k *kinesisReader) runExplicitShards() {
sequence, err := k.checkpointer.Claim(k.ctx, id, shardID, "")
if err == nil {
wg.Add(1)
err = k.runConsumer(&wg, info, shardID, sequence)
if k.efoEnabled {
err = k.runEFOConsumer(&wg, info, shardID, sequence)
} else {
err = k.runConsumer(&wg, info, shardID, sequence)
}
}
if err != nil {
if k.ctx.Err() != nil {
Expand Down Expand Up @@ -868,6 +961,26 @@ func (k *kinesisReader) Connect(ctx context.Context) error {
return err
}

// Initialize Enhanced Fan Out if enabled
if k.efoEnabled {
for _, stream := range k.streams {
// Create EFO manager for this stream
efoMgr, err := newKinesisEFOManager(k.conf.EnhancedFanOut, stream.arn, k.clientID, k.svc, k.log)
if err != nil {
return fmt.Errorf("failed to create EFO manager for stream %s: %w", stream.id, err)
}

// Register consumer and wait for ACTIVE status
consumerARN, err := efoMgr.ensureConsumerRegistered(ctx)
if err != nil {
return fmt.Errorf("failed to register EFO consumer for stream %s: %w", stream.id, err)
}

stream.efoManager = efoMgr
k.log.Infof("Enhanced Fan Out consumer registered for stream %s with ARN: %s", stream.id, consumerARN)
}
}

if len(k.streams[0].explicitShards) > 0 {
go k.runExplicitShards()
} else {
Expand Down
27 changes: 27 additions & 0 deletions internal/impl/aws/input_kinesis_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,33 @@ type awsKinesisClientClaim struct {
LeaseTimeout time.Time
}

// AllCheckpoints returns a set of all shard IDs that have checkpoint records
// in DynamoDB for the given stream, regardless of whether they are claimed or not.
func (k *awsKinesisCheckpointer) AllCheckpoints(ctx context.Context, streamID string) (map[string]bool, error) {
checkpoints := make(map[string]bool)

scanRes, err := k.svc.Scan(ctx, &dynamodb.ScanInput{
TableName: aws.String(k.conf.Table),
FilterExpression: aws.String("StreamID = :stream_id"),
ExpressionAttributeValues: map[string]types.AttributeValue{
":stream_id": &types.AttributeValueMemberS{
Value: streamID,
},
},
})
if err != nil {
return nil, err
}

for _, i := range scanRes.Items {
if s, ok := i["ShardID"].(*types.AttributeValueMemberS); ok {
checkpoints[s.Value] = true
}
}

return checkpoints, nil
}

// AllClaims returns a map of client IDs to shards claimed by that client,
// including the lease timeout of the claim.
func (k *awsKinesisCheckpointer) AllClaims(ctx context.Context, streamID string) (map[string][]awsKinesisClientClaim, error) {
Expand Down
Loading