fix: clear destination support for gcp#890
Conversation
destination/parquet/parquet.go
Outdated
| var batchErr awserr.Error | ||
| if errors.As(err, &batchErr) { | ||
| if origErr := batchErr.OrigErr(); origErr != nil { | ||
| return isRateLimitError(origErr) |
There was a problem hiding this comment.
can we eliminate recursion here?
There was a problem hiding this comment.
it will occur just once , what's the benefit of removing it ?
destination/parquet/parquet.go
Outdated
| Prefix: aws.String(filtPath), | ||
| }, func(page *s3.ListObjectsOutput, _ bool) bool { | ||
| for _, obj := range page.Contents { | ||
| allKeys = append(allKeys, *obj.Key) |
There was a problem hiding this comment.
Since S3 already provides pagination, there is no reason to load all keys into memory at once.
Instead, use utils.Yield + fetchNextPage(create a fn) to stream one page at a time,
and utils.ConcurrentC to delete concurrently as keys arrive, keeping memory usage flat.
destination/parquet/parquet.go
Outdated
| func isRateLimitError(err error) bool { | ||
| // Case 1: Direct RequestFailure (from individual DeleteObject calls) | ||
| var reqErr awserr.RequestFailure | ||
| if errors.As(err, &reqErr) { |
There was a problem hiding this comment.
better to add this well "NotImplemented"
There was a problem hiding this comment.
no this is rate limiting error which can return success after a certain backoff, not implemented will always fail.
destination/parquet/parquet.go
Outdated
|
|
||
| // retryOnRateLimit retries the given operation with linear backoff when a | ||
| // rate-limit error is encountered. Non-rate-limit errors are returned immediately. | ||
| func retryOnRateLimit(ctx context.Context, maxRetries int, operation string, fn func() error) error { |
There was a problem hiding this comment.
can we use RetryOnBackoff from utils if not
replace with a shared utility (e.g. utils.RetryWithSkip) that accepts a shouldSkip func(error) bool to skip retry on specific errors
| if err := deleteS3PrefixStandard(s3TablePath); err != nil { | ||
|
|
||
| var err error | ||
| if strings.Contains(p.config.S3Endpoint, "googleapis.com") { |
There was a problem hiding this comment.
what if users connecting GCS through a private proxy?
There was a problem hiding this comment.
we are using this to directly use the individual deletes function, incase the endpoint is different it will use batch delete only and if that throws error it fallbacks to individual delete function.
|
|
||
| if err != nil { | ||
| logger.Warnf("batch delete failed for filtPath %s, falling back to individual deletes: %v", filtPath, err) | ||
| if fallbackErr := deleteS3PrefixIndividually(filtPath); fallbackErr != nil { |
There was a problem hiding this comment.
we should not fallback to individual deletes if error is related to rate limiting , if batch delete is throttling then individual delete would also throttle
destination/parquet/parquet.go
Outdated
| if err != nil { | ||
| logger.Warnf("batch delete failed for filtPath %s, falling back to individual deletes: %v", filtPath, err) | ||
| if fallbackErr := deleteS3PrefixIndividually(filtPath); fallbackErr != nil { | ||
| return fmt.Errorf("batch delete failed: %v, and fallback individual delete also failed for filtPath %s: %s", err, filtPath, fallbackErr) |
There was a problem hiding this comment.
fallback error already contains filepath
destination/parquet/parquet.go
Outdated
| iter := s3manager.NewDeleteListIterator(p.s3Client, &s3.ListObjectsInput{ | ||
| deleteS3PrefixIndividually := func(filtPath string) error { | ||
| var pageErr error | ||
| err := p.s3Client.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ |
There was a problem hiding this comment.
don't we need to add retry logic here as well?
Description
This PR resolves a "NotImplemented" error occurring during the
clear-destinationcommand when using GCP buckets. GCP does not support the S3 DeleteObjects (bulk delete) operation, which caused the Parquet destination to fail during cleanup.Added backoff retry (1m → 2m → 3m) specifically for bucket rate limits
Fixes #877
Type of change
How Has This Been Tested?
Verified using the
clear-destinationcommand with a MySQL driver pointing to a GCP S3-compatible bucket.Screenshots or Recordings
Documentation
Related PR's (If Any):
N/A