Skip to content

fix: clear destination support for gcp#890

Open
vaibhav-datazip wants to merge 3 commits intostagingfrom
fix/gcs-clear-destination
Open

fix: clear destination support for gcp#890
vaibhav-datazip wants to merge 3 commits intostagingfrom
fix/gcs-clear-destination

Conversation

@vaibhav-datazip
Copy link
Collaborator

Description

This PR resolves a "NotImplemented" error occurring during the clear-destination command when using GCP buckets. GCP does not support the S3 DeleteObjects (bulk delete) operation, which caused the Parquet destination to fail during cleanup.

Added backoff retry (1m → 2m → 3m) specifically for bucket rate limits

Fixes #877

Type of change

  • Bug fix (non-breaking change which fixes an issue)

How Has This Been Tested?

Verified using the clear-destination command with a MySQL driver pointing to a GCP S3-compatible bucket.

  • GCP Compatibility: Verified that the system detects GCP endpoints and uses parallelized individual deletions to avoid the unsupported batch API.

Screenshots or Recordings

Documentation

  • Documentation Link: [link to README, olake.io/docs, or olake-docs]
  • N/A (bug fix, refactor, or test changes only)

Related PR's (If Any):

N/A

var batchErr awserr.Error
if errors.As(err, &batchErr) {
if origErr := batchErr.OrigErr(); origErr != nil {
return isRateLimitError(origErr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we eliminate recursion here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will occur just once , what's the benefit of removing it ?

Prefix: aws.String(filtPath),
}, func(page *s3.ListObjectsOutput, _ bool) bool {
for _, obj := range page.Contents {
allKeys = append(allKeys, *obj.Key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since S3 already provides pagination, there is no reason to load all keys into memory at once.
Instead, use utils.Yield + fetchNextPage(create a fn) to stream one page at a time,
and utils.ConcurrentC to delete concurrently as keys arrive, keeping memory usage flat.

func isRateLimitError(err error) bool {
// Case 1: Direct RequestFailure (from individual DeleteObject calls)
var reqErr awserr.RequestFailure
if errors.As(err, &reqErr) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to add this well "NotImplemented"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this is rate limiting error which can return success after a certain backoff, not implemented will always fail.


// retryOnRateLimit retries the given operation with linear backoff when a
// rate-limit error is encountered. Non-rate-limit errors are returned immediately.
func retryOnRateLimit(ctx context.Context, maxRetries int, operation string, fn func() error) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use RetryOnBackoff from utils if not
replace with a shared utility (e.g. utils.RetryWithSkip) that accepts a shouldSkip func(error) bool to skip retry on specific errors

if err := deleteS3PrefixStandard(s3TablePath); err != nil {

var err error
if strings.Contains(p.config.S3Endpoint, "googleapis.com") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if users connecting GCS through a private proxy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are using this to directly use the individual deletes function, incase the endpoint is different it will use batch delete only and if that throws error it fallbacks to individual delete function.


if err != nil {
logger.Warnf("batch delete failed for filtPath %s, falling back to individual deletes: %v", filtPath, err)
if fallbackErr := deleteS3PrefixIndividually(filtPath); fallbackErr != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not fallback to individual deletes if error is related to rate limiting , if batch delete is throttling then individual delete would also throttle

if err != nil {
logger.Warnf("batch delete failed for filtPath %s, falling back to individual deletes: %v", filtPath, err)
if fallbackErr := deleteS3PrefixIndividually(filtPath); fallbackErr != nil {
return fmt.Errorf("batch delete failed: %v, and fallback individual delete also failed for filtPath %s: %s", err, filtPath, fallbackErr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fallback error already contains filepath

iter := s3manager.NewDeleteListIterator(p.s3Client, &s3.ListObjectsInput{
deleteS3PrefixIndividually := func(filtPath string) error {
var pageErr error
err := p.s3Client.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to add retry logic here as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants