Skip to content

Commit 746f171

Browse files
committed
util/ioctx: add RandomAccessReader for cloud storage
Add a general-purpose wrapper that provides ReadAt/Seek support for any cloud storage reader using an opener factory function pattern. This enables random access (required by formats like Parquet) without requiring the cloud storage implementation itself to natively support ReadAt/Seek. Instead, it opens new readers at specific offsets for each ReadAt call. Key features: - Thread-safe ReadAt: Each call opens a new connection, allowing concurrent reads from different offsets - Seek support: Tracks position logically without seeking underlying readers - Generic: Works with any ReadCloserCtx via OpenerAtFunc factory Release note: None Epic: CRDB-23802
1 parent d1a7838 commit 746f171

File tree

3 files changed

+436
-2
lines changed

3 files changed

+436
-2
lines changed

pkg/util/ioctx/BUILD.bazel

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "ioctx",
5-
srcs = ["reader.go"],
5+
srcs = [
6+
"random_access_reader.go",
7+
"reader.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/util/ioctx",
710
visibility = ["//visibility:public"],
811
deps = ["@com_github_cockroachdb_errors//:errors"],
@@ -11,7 +14,10 @@ go_library(
1114
go_test(
1215
name = "ioctx_test",
1316
size = "small",
14-
srcs = ["reader_test.go"],
17+
srcs = [
18+
"random_access_reader_test.go",
19+
"reader_test.go",
20+
],
1521
embed = [":ioctx"],
1622
deps = [
1723
"@com_github_cockroachdb_errors//:errors",
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package ioctx
7+
8+
import (
9+
"context"
10+
"io"
11+
12+
"github.com/cockroachdb/errors"
13+
)
14+
15+
// OpenerAtFunc is a factory function that opens a reader at a specific offset.
16+
// endHint can be used to optimize the read if the end position is known.
17+
type OpenerAtFunc func(ctx context.Context, offset int64, endHint int64) (ReadCloserCtx, error)
18+
19+
// randomAccessReader wraps any cloud storage reader and adds ReadAt/Seek support
20+
// using a factory function that can open readers at specific offsets.
21+
//
22+
// This provides random access capabilities without requiring the cloud storage
23+
// implementation itself to support seeking. Each ReadAt call opens a new reader
24+
// at the requested offset.
25+
//
26+
// Thread Safety:
27+
// - ReadAt: Safe for concurrent calls. Each call opens its own reader.
28+
// - Read/Seek: NOT safe for concurrent calls. Should only be used from a single goroutine.
29+
type randomAccessReader struct {
30+
ctx context.Context
31+
size int64
32+
openAt OpenerAtFunc
33+
34+
// Current sequential reader (for Read operations)
35+
current ReadCloserCtx
36+
pos int64
37+
}
38+
39+
var _ io.ReaderAt = &randomAccessReader{}
40+
var _ io.ReadSeekCloser = &randomAccessReader{}
41+
42+
// NewRandomAccessReader creates a reader that supports ReadAt and Seek
43+
// by using the provided opener function to create readers at specific offsets.
44+
//
45+
// Parameters:
46+
// - ctx: Context for operations
47+
// - size: Total size of the file
48+
// - openAt: Factory function that opens a reader at a given offset
49+
func NewRandomAccessReader(
50+
ctx context.Context, size int64, openAt OpenerAtFunc,
51+
) *randomAccessReader {
52+
return &randomAccessReader{
53+
ctx: ctx,
54+
size: size,
55+
openAt: openAt,
56+
}
57+
}
58+
59+
// Read implements io.Reader using sequential access
60+
func (r *randomAccessReader) Read(p []byte) (int, error) {
61+
if r.current == nil {
62+
// Open reader at current position
63+
opened, err := r.openAt(r.ctx, r.pos, r.size)
64+
if err != nil {
65+
return 0, err
66+
}
67+
r.current = opened
68+
}
69+
70+
n, err := r.current.Read(r.ctx, p)
71+
r.pos += int64(n)
72+
return n, err
73+
}
74+
75+
// ReadAt implements io.ReaderAt - safe for concurrent calls.
76+
// Each call opens a new reader at the requested offset, so multiple
77+
// goroutines can call ReadAt simultaneously.
78+
//
79+
// Note: This opens a new connection (HTTP request) for every ReadAt call.
80+
// For cloud storage, this means each random access read results in a new
81+
// range request to the storage service.
82+
func (r *randomAccessReader) ReadAt(p []byte, off int64) (int, error) {
83+
if off >= r.size {
84+
return 0, io.EOF
85+
}
86+
87+
// Open a new reader at the requested offset
88+
reader, err := r.openAt(r.ctx, off, off+int64(len(p)))
89+
if err != nil {
90+
return 0, err
91+
}
92+
defer reader.Close(r.ctx)
93+
94+
// Read exactly len(p) bytes or until EOF
95+
return io.ReadFull(ReaderCtxAdapter(r.ctx, reader), p)
96+
}
97+
98+
// Seek implements io.Seeker
99+
func (r *randomAccessReader) Seek(offset int64, whence int) (int64, error) {
100+
var newPos int64
101+
switch whence {
102+
case io.SeekStart:
103+
newPos = offset
104+
case io.SeekCurrent:
105+
newPos = r.pos + offset
106+
case io.SeekEnd:
107+
newPos = r.size + offset
108+
default:
109+
return 0, errors.Newf("invalid whence: %d", whence)
110+
}
111+
112+
if newPos < 0 {
113+
return 0, errors.New("negative position")
114+
}
115+
116+
// Close current reader if position changed
117+
if r.current != nil && newPos != r.pos {
118+
r.current.Close(r.ctx)
119+
r.current = nil
120+
}
121+
122+
r.pos = newPos
123+
return newPos, nil
124+
}
125+
126+
// Close closes any open reader
127+
func (r *randomAccessReader) Close() error {
128+
if r.current != nil {
129+
return r.current.Close(r.ctx)
130+
}
131+
return nil
132+
}

0 commit comments

Comments
 (0)