Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
}

return func(partition *protos.QRepPartition) error {
stream := model.NewQObjectStream(shared.QRepChannelSize)
stream := model.NewQObjectStream(0) // unbuffered to avoid object or short-lived token expiration

return replicateQRepPartition(ctx, a, srcConn, destConn, dstPeer.Type, config, partition, runUUID, stream, stream,
connectors.QRepPullObjectsConnector.PullQRepObjects,
Expand Down
157 changes: 0 additions & 157 deletions flow/connectors/bigquery/avro.go

This file was deleted.

18 changes: 0 additions & 18 deletions flow/connectors/bigquery/avro_test.go

This file was deleted.

8 changes: 8 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,14 @@ func (d *datasetTable) string() string {
return fmt.Sprintf("%s.%s.%s", d.project, d.dataset, d.table)
}

func (d *datasetTable) stringQuoted() string {
if d.project == "" {
return fmt.Sprintf("%s.%s", quotedIdentifier(d.dataset), quotedIdentifier(d.table))
}

return fmt.Sprintf("%s.%s.%s", quotedIdentifier(d.project), quotedIdentifier(d.dataset), quotedIdentifier(d.table))
}

func (c *BigQueryConnector) convertToDatasetTable(tableName string) (datasetTable, error) {
parts := strings.Split(tableName, ".")
if len(parts) == 1 {
Expand Down
50 changes: 50 additions & 0 deletions flow/connectors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package connbigquery

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDatasetTableStringQuoted(t *testing.T) {
testCases := []struct {
datasetTable datasetTable
expected string
}{
{
datasetTable: datasetTable{
dataset: "my_dataset",
table: "my_table",
},
expected: "`my_dataset`.`my_table`",
},
{
datasetTable: datasetTable{
dataset: `data"set`,
table: `ta"ble`,
},
expected: "`data\"set`.`ta\"ble`",
},
{
datasetTable: datasetTable{
dataset: "data`set",
table: "ta`ble",
},
expected: "`data\\`set`.`ta\\`ble`",
},
{
datasetTable: datasetTable{
project: "my_project`; DROP TABLE users;--",
dataset: "my_dataset",
table: "my_table",
},
expected: "`my_project\\`; DROP TABLE users;--`.`my_dataset`.`my_table`",
},
}

for _, tc := range testCases {
t.Run(tc.expected, func(t *testing.T) {
assert.Equal(t, tc.expected, tc.datasetTable.stringQuoted())
})
}
}
135 changes: 135 additions & 0 deletions flow/connectors/bigquery/data_format_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package connbigquery

import (
"context"
"fmt"
"io"
"log/slog"

"cloud.google.com/go/storage"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"go.temporal.io/sdk/log"
)

const defaultCompressedRowSize = 512 // bytes. Should be a reasonable low default for analytical data

// gcsObjectSeeker implements io.ReadSeeker for a GCS object using range reads.
// Designed for a quick reading specific parts of a Parquet file. Like file header.
// Not efficient for many small reads/seeks.
type gcsObjectSeeker struct {
ctx context.Context //nolint:containedctx // context for GCS operations

object *storage.ObjectHandle
fileSize int64
offset int64
}

func (r *gcsObjectSeeker) Read(p []byte) (int, error) {
length := int64(len(p))

if r.offset < 0 || length <= 0 || r.offset+length > r.fileSize {
return 0, fmt.Errorf(
"invalid offset/length for reading GCS object chunk: offset=%d, length=%d, fileSize=%d",
r.offset,
length,
r.fileSize,
)
}

reader, err := r.object.NewRangeReader(r.ctx, r.offset, length)
if err != nil {
return 0, err
}

defer reader.Close()

n, err := io.ReadFull(reader, p)
if err != nil {
return n, err
}
r.offset += int64(n)
return n, nil
}

func (r *gcsObjectSeeker) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = r.offset + offset
case io.SeekEnd:
newOffset = r.fileSize + offset
default:
return 0, fmt.Errorf("invalid whence: %d", whence)
}

if newOffset < 0 || newOffset > r.fileSize {
return 0, fmt.Errorf("invalid seek offset: %d", newOffset)
}

r.offset = newOffset
return r.offset, nil
}

func (r *gcsObjectSeeker) ReadAt(p []byte, off int64) (int, error) {
currentOffset := r.offset
if _, err := r.Seek(off, io.SeekStart); err != nil {
return 0, err
}
n, err := r.Read(p)
if _, seekErr := r.Seek(currentOffset, io.SeekStart); seekErr != nil {
return n, seekErr
}
return n, err
}

var _ parquet.ReaderAtSeeker = (*gcsObjectSeeker)(nil)

func readParquetTotalRows(r parquet.ReaderAtSeeker) (int64, error) {
pr, err := file.NewParquetReader(r)
if err != nil {
return 0, fmt.Errorf("failed to create parquet reader: %w", err)
}

return pr.NumRows(), nil
}

func parquetObjectAverageRowSize(ctx context.Context, logger log.Logger, object *storage.ObjectHandle) uint64 {
attrs, err := object.Attrs(ctx)
if err != nil {
logger.Error("failed to get object attrs for parquet object, using default compressed row size",
slog.String("object", object.ObjectName()),
slog.Int("default_size", defaultCompressedRowSize),
slog.Any("error", err))

return defaultCompressedRowSize
}

r := &gcsObjectSeeker{
ctx: ctx,
object: object,
fileSize: attrs.Size,
offset: 0,
}

n, err := readParquetTotalRows(r)
if err != nil {
logger.Error("failed to read parquet object metadata, using default compressed row size",
slog.String("object", object.ObjectName()),
slog.Int("default_size", defaultCompressedRowSize),
slog.Any("error", err))

return defaultCompressedRowSize
}

if n == 0 {
logger.Info("parquet object has zero rows, using default compressed row size",
slog.String("object", object.ObjectName()))

return defaultCompressedRowSize
}

return uint64(attrs.Size) / uint64(n)
}
Loading
Loading