Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 88 additions & 6 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parquet
import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"os"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
Expand Down Expand Up @@ -569,15 +571,87 @@ func (p *Parquet) clearLocalFiles(paths []string) error {
return nil
}

// isRateLimitError checks if the error is a rate-limit/throttle response from S3 or GCP.
// AWS S3 returns HTTP 503 for throttling (SlowDown / ServiceUnavailable).
// GCP Cloud Storage returns HTTP 429 (Too Many Requests).
//
// For batch delete operations, errors are wrapped in s3manager.BatchError which does NOT
// implement awserr.RequestFailure directly. The actual RequestFailure is nested inside
// BatchError.Errors[].OrigErr, so we must inspect those inner errors as well.
func isRateLimitError(err error) bool {
isThrottled := func(target error) bool {
var rf awserr.RequestFailure
return errors.As(target, &rf) && (rf.StatusCode() == 429 || rf.StatusCode() == 503)
}
if isThrottled(err) {
return true
}
// AWS SDK v1 batch errors don't implement Unwrap(), so we peel one layer manually.
var batchErr awserr.Error
if errors.As(err, &batchErr) {
return isThrottled(batchErr.OrigErr())
}
return false
}

func (p *Parquet) clearS3Files(ctx context.Context, paths []string) error {
deleteS3PrefixIndividually := func(filtPath string) error {
var pageErr error
listErr := utils.RetryWithSkip(ctx, 3, time.Minute, isRateLimitError, func(_ context.Context) error {
pageErr = nil
return p.s3Client.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(p.config.Bucket),
Prefix: aws.String(filtPath),
}, func(page *s3.ListObjectsOutput, _ bool) bool {
pageKeys := make([]string, 0, len(page.Contents))
for _, obj := range page.Contents {
pageKeys = append(pageKeys, *obj.Key)
}
if len(pageKeys) == 0 {
return true
}

logger.Debugf("individual delete: found %d objects under %s, deleting", len(pageKeys), filtPath)

// GCP allows 5000 mutations per second per bucket
concurrency := min(runtime.GOMAXPROCS(0)*4, len(pageKeys))
if pageErr = utils.Concurrent(ctx, pageKeys, concurrency, func(_ context.Context, key string, _ int) error {
return utils.RetryWithSkip(ctx, 3, time.Minute, isRateLimitError, func(_ context.Context) error {
_, err := p.s3Client.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(p.config.Bucket),
Key: aws.String(key),
})
if err != nil {
return err
}
return nil
})
}); pageErr != nil {
return false
}
return true
})
})
if listErr != nil {
return fmt.Errorf("failed to list objects for prefix %s: %v", filtPath, listErr)
}
return pageErr
}

deleteS3PrefixStandard := func(filtPath string) error {
iter := s3manager.NewDeleteListIterator(p.s3Client, &s3.ListObjectsInput{
Bucket: aws.String(p.config.Bucket),
Prefix: aws.String(filtPath),
err := utils.RetryWithSkip(ctx, 3, time.Minute, isRateLimitError, func(_ context.Context) error {
iter := s3manager.NewDeleteListIterator(p.s3Client, &s3.ListObjectsInput{
Bucket: aws.String(p.config.Bucket),
Prefix: aws.String(filtPath),
})
return s3manager.NewBatchDeleteWithClient(p.s3Client).Delete(ctx, iter)
})

if err := s3manager.NewBatchDeleteWithClient(p.s3Client).Delete(ctx, iter); err != nil {
return fmt.Errorf("batch delete failed for filtPath %s: %s", filtPath, err)
if err != nil {
logger.Warnf("batch delete failed for filtPath %s, falling back to individual deletes: %v", filtPath, err)
if fallbackErr := deleteS3PrefixIndividually(filtPath); fallbackErr != nil {
return fmt.Errorf("batch delete failed: %v, fallback individual delete also failed: %s", err, fallbackErr)
}
}
return nil
}
Expand All @@ -592,7 +666,15 @@ func (p *Parquet) clearS3Files(ctx context.Context, paths []string) error {
s3TablePath := filepath.Join(prefix, namespace, tableName, "/")

logger.Debugf("clearing S3 prefix: s3://%s/%s", p.config.Bucket, s3TablePath)
if err := deleteS3PrefixStandard(s3TablePath); err != nil {

var err error
if strings.Contains(p.config.S3Endpoint, "googleapis.com") {
err = deleteS3PrefixIndividually(s3TablePath)
} else {
err = deleteS3PrefixStandard(s3TablePath)
}

if err != nil {
return fmt.Errorf("failed to clear S3 prefix %s: %s", s3TablePath, err)
}

Expand Down
32 changes: 32 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,38 @@ func SplitAndTrim(s string) []string {
return result
}

// RetryWithSkip calls f once and retries up to maxRetries more times on error,
// using linear backoff ((retry+1)*sleep) between each attempt.
// shouldRetry is called on each non-nil error: return true to keep retrying, false to surface the
// error immediately without waiting. A nil shouldRetry retries all errors.
func RetryWithSkip(ctx context.Context, maxRetries int, sleep time.Duration, shouldRetry func(error) bool, f func(ctx context.Context) error) (err error) {
for cur := range maxRetries + 1 {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err = f(ctx); err == nil {
return nil
}
}

if shouldRetry != nil && !shouldRetry(err) {
return err
}

if cur != maxRetries {
backoff := time.Duration(cur+1) * sleep
logger.Infof("retry attempt[%d/%d], retrying after %.2f seconds due to err: %s", cur+1, maxRetries, backoff.Seconds(), err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
}
}
return err
}

// RetryOnBackoff retries the function f up to attempts times with a backoff sleep between attempts.
func RetryOnBackoff(ctx context.Context, attempts int, sleep time.Duration, f func(ctx context.Context) error) (err error) {
for cur := range attempts {
Expand Down
Loading