Skip to content

Commit acdb2eb

Browse files
authored
New committer (#271)
* Initial kafka committer * Update config * Error on uninitialize brokers * Update queries * Option to disable TLS for kafka * Add projection mode in blocks * Fix publish parallel mode * Gofmt * Update schema * Fix backfill table * Update kafka storage producer * Kafka + Redis * Update schema payload * Update kafka-postgres -> kafka-redis config * Update schema to use replacing merge tree * Fix schema * Badger & S3 * Until block for committer * terminate when poller or committer exit * Fix commit until block * Don't cancel active tasks in poller * migrate with destination storage * Remove RPC batch config in migrate * Cleanup * Add from_address, to_address to schema * Retry with RPC batch size reduction * Shuffle Orchestrator and Staging interface * store poller in committer * Simplified storage. Split kafka and redis * kafka requires orchestrator * Fix orchestrator flag * Fix badger keys * Fix backfill missing blocks in staging * Revert "Fix backfill missing blocks in staging" This reverts commit 6233232. * block buffer * Poller S3 support * Fix boundaries for migration * Badger for caching in s3 connector * optimize s3 insertion * redis tls. erc1155 batch mv * fix projections, use _part_offset projections * gofmt * Fix test
1 parent 3a03423 commit acdb2eb

File tree

55 files changed

+7505
-925
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+7505
-925
lines changed

cmd/migrate_valid.go

Lines changed: 344 additions & 105 deletions
Large diffs are not rendered by default.

cmd/orchestrator.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,19 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
3232
if err != nil {
3333
log.Fatal().Err(err).Msg("Failed to create orchestrator")
3434
}
35+
3536
// Start Prometheus metrics server
3637
log.Info().Msg("Starting Metrics Server on port 2112")
3738
go func() {
3839
http.Handle("/metrics", promhttp.Handler())
39-
http.ListenAndServe(":2112", nil)
40+
if err := http.ListenAndServe(":2112", nil); err != nil {
41+
log.Error().Err(err).Msg("Metrics server error")
42+
}
4043
}()
4144

45+
// Start orchestrator (blocks until shutdown)
46+
// The orchestrator handles signals internally and coordinates shutdown
4247
orchestrator.Start()
48+
49+
log.Info().Msg("Shutdown complete")
4350
}

cmd/root.go

Lines changed: 174 additions & 0 deletions
Large diffs are not rendered by default.

configs/config.go

Lines changed: 97 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"strings"
8+
"time"
89

910
"github.com/rs/zerolog/log"
1011
"github.com/spf13/viper"
@@ -16,20 +17,22 @@ type LogConfig struct {
1617
}
1718

1819
type PollerConfig struct {
19-
Enabled bool `mapstructure:"enabled"`
20-
Interval int `mapstructure:"interval"`
21-
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
22-
FromBlock int `mapstructure:"fromBlock"`
23-
ForceFromBlock bool `mapstructure:"forceFromBlock"`
24-
UntilBlock int `mapstructure:"untilBlock"`
25-
ParallelPollers int `mapstructure:"parallelPollers"`
20+
Enabled bool `mapstructure:"enabled"`
21+
Interval int `mapstructure:"interval"`
22+
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
23+
FromBlock int `mapstructure:"fromBlock"`
24+
ForceFromBlock bool `mapstructure:"forceFromBlock"`
25+
UntilBlock int `mapstructure:"untilBlock"`
26+
ParallelPollers int `mapstructure:"parallelPollers"`
27+
S3 *S3SourceConfig `mapstructure:"s3"`
2628
}
2729

2830
type CommitterConfig struct {
2931
Enabled bool `mapstructure:"enabled"`
3032
Interval int `mapstructure:"interval"`
3133
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
3234
FromBlock int `mapstructure:"fromBlock"`
35+
UntilBlock int `mapstructure:"untilBlock"`
3336
}
3437

3538
type ReorgHandlerConfig struct {
@@ -47,21 +50,62 @@ type FailureRecovererConfig struct {
4750
}
4851

4952
type StorageConfig struct {
50-
Staging StorageConnectionConfig `mapstructure:"staging"`
51-
Main StorageConnectionConfig `mapstructure:"main"`
52-
Orchestrator StorageConnectionConfig `mapstructure:"orchestrator"`
53+
Orchestrator StorageOrchestratorConfig `mapstructure:"orchestrator"`
54+
Staging StorageStagingConfig `mapstructure:"staging"`
55+
Main StorageMainConfig `mapstructure:"main"`
5356
}
54-
type StorageType string
5557

56-
const (
57-
StorageTypeMain StorageType = "main"
58-
StorageTypeStaging StorageType = "staging"
59-
StorageTypeOrchestrator StorageType = "orchestrator"
60-
)
58+
type StorageOrchestratorConfig struct {
59+
Type string `mapstructure:"type"`
60+
Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"`
61+
Postgres *PostgresConfig `mapstructure:"postgres"`
62+
Redis *RedisConfig `mapstructure:"redis"`
63+
Badger *BadgerConfig `mapstructure:"badger"`
64+
}
6165

62-
type StorageConnectionConfig struct {
66+
type StorageStagingConfig struct {
67+
Type string `mapstructure:"type"`
6368
Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"`
6469
Postgres *PostgresConfig `mapstructure:"postgres"`
70+
Badger *BadgerConfig `mapstructure:"badger"`
71+
}
72+
73+
type StorageMainConfig struct {
74+
Type string `mapstructure:"type"`
75+
Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"`
76+
Postgres *PostgresConfig `mapstructure:"postgres"`
77+
Kafka *KafkaConfig `mapstructure:"kafka"`
78+
Badger *BadgerConfig `mapstructure:"badger"`
79+
S3 *S3StorageConfig `mapstructure:"s3"`
80+
}
81+
82+
type BadgerConfig struct {
83+
Path string `mapstructure:"path"`
84+
}
85+
86+
type S3Config struct {
87+
Bucket string `mapstructure:"bucket"`
88+
Region string `mapstructure:"region"`
89+
Prefix string `mapstructure:"prefix"`
90+
AccessKeyID string `mapstructure:"accessKeyId"`
91+
SecretAccessKey string `mapstructure:"secretAccessKey"`
92+
Endpoint string `mapstructure:"endpoint"`
93+
}
94+
95+
type S3StorageConfig struct {
96+
S3Config `mapstructure:",squash"`
97+
Format string `mapstructure:"format"`
98+
Parquet *ParquetConfig `mapstructure:"parquet"`
99+
// Buffering configuration
100+
BufferSize int64 `mapstructure:"bufferSizeMB"` // Target buffer size in MB before flush (default 512 MB)
101+
BufferTimeout int `mapstructure:"bufferTimeoutSeconds"` // Max time in seconds before flush (default 300 = 5 min)
102+
MaxBlocksPerFile int `mapstructure:"maxBlocksPerFile"` // Max blocks per parquet file (0 = no limit, only size/timeout triggers)
103+
}
104+
105+
type ParquetConfig struct {
106+
Compression string `mapstructure:"compression"`
107+
RowGroupSize int64 `mapstructure:"rowGroupSize"`
108+
PageSize int64 `mapstructure:"pageSize"`
65109
}
66110

67111
type TableConfig struct {
@@ -86,6 +130,7 @@ type ClickhouseConfig struct {
86130
EnableParallelViewProcessing bool `mapstructure:"enableParallelViewProcessing"`
87131
MaxQueryTime int `mapstructure:"maxQueryTime"`
88132
MaxMemoryUsage int `mapstructure:"maxMemoryUsage"`
133+
EnableCompression bool `mapstructure:"enableCompression"`
89134
}
90135

91136
type PostgresConfig struct {
@@ -101,6 +146,21 @@ type PostgresConfig struct {
101146
ConnectTimeout int `mapstructure:"connectTimeout"`
102147
}
103148

149+
type RedisConfig struct {
150+
Host string `mapstructure:"host"`
151+
Port int `mapstructure:"port"`
152+
Password string `mapstructure:"password"`
153+
DB int `mapstructure:"db"`
154+
EnableTLS bool `mapstructure:"enableTLS"`
155+
}
156+
157+
type KafkaConfig struct {
158+
Brokers string `mapstructure:"brokers"`
159+
Username string `mapstructure:"username"`
160+
Password string `mapstructure:"password"`
161+
EnableTLS bool `mapstructure:"enableTLS"`
162+
}
163+
104164
type RPCBatchRequestConfig struct {
105165
BlocksPerRequest int `mapstructure:"blocksPerRequest"`
106166
BatchDelay int `mapstructure:"batchDelay"`
@@ -177,12 +237,23 @@ type PublisherConfig struct {
177237
Brokers string `mapstructure:"brokers"`
178238
Username string `mapstructure:"username"`
179239
Password string `mapstructure:"password"`
240+
EnableTLS bool `mapstructure:"enableTLS"`
180241
Blocks BlockPublisherConfig `mapstructure:"blocks"`
181242
Transactions TransactionPublisherConfig `mapstructure:"transactions"`
182243
Traces TracePublisherConfig `mapstructure:"traces"`
183244
Events EventPublisherConfig `mapstructure:"events"`
184245
}
185246

247+
type S3SourceConfig struct {
248+
S3Config `mapstructure:",squash"`
249+
CacheDir string `mapstructure:"cacheDir"`
250+
MetadataTTL time.Duration `mapstructure:"metadataTTL"`
251+
FileCacheTTL time.Duration `mapstructure:"fileCacheTTL"`
252+
MaxCacheSize int64 `mapstructure:"maxCacheSize"`
253+
CleanupInterval time.Duration `mapstructure:"cleanupInterval"`
254+
MaxConcurrentDownloads int `mapstructure:"maxConcurrentDownloads"`
255+
}
256+
186257
type WorkModeConfig struct {
187258
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
188259
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`
@@ -192,6 +263,14 @@ type ValidationConfig struct {
192263
Mode string `mapstructure:"mode"` // "disabled", "minimal", "strict"
193264
}
194265

266+
type MigratorConfig struct {
267+
Destination StorageMainConfig `mapstructure:"destination"`
268+
StartBlock uint `mapstructure:"startBlock"`
269+
EndBlock uint `mapstructure:"endBlock"`
270+
BatchSize uint `mapstructure:"batchSize"`
271+
WorkerCount uint `mapstructure:"workerCount"`
272+
}
273+
195274
type Config struct {
196275
RPC RPCConfig `mapstructure:"rpc"`
197276
Log LogConfig `mapstructure:"log"`
@@ -204,6 +283,7 @@ type Config struct {
204283
Publisher PublisherConfig `mapstructure:"publisher"`
205284
WorkMode WorkModeConfig `mapstructure:"workMode"`
206285
Validation ValidationConfig `mapstructure:"validation"`
286+
Migrator MigratorConfig `mapstructure:"migrator"`
207287
}
208288

209289
var Cfg Config

go.mod

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@ go 1.23.0
44

55
require (
66
github.com/ClickHouse/clickhouse-go/v2 v2.36.0
7+
github.com/aws/aws-sdk-go-v2 v1.38.0
8+
github.com/aws/aws-sdk-go-v2/config v1.31.0
9+
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.0
10+
github.com/dgraph-io/badger/v4 v4.8.0
711
github.com/ethereum/go-ethereum v1.15.11
812
github.com/gin-gonic/gin v1.10.0
913
github.com/gorilla/schema v1.4.1
1014
github.com/holiman/uint256 v1.3.2
1115
github.com/lib/pq v1.10.9
16+
github.com/parquet-go/parquet-go v0.25.1
1217
github.com/prometheus/client_golang v1.20.4
18+
github.com/redis/go-redis/v9 v9.12.1
1319
github.com/rs/zerolog v1.33.0
14-
github.com/spf13/cobra v1.8.1
20+
github.com/spf13/cobra v1.9.1
1521
github.com/spf13/viper v1.18.0
1622
github.com/stretchr/testify v1.10.0
1723
github.com/swaggo/files v1.0.1
@@ -25,6 +31,21 @@ require (
2531
github.com/KyleBanks/depth v1.2.1 // indirect
2632
github.com/Microsoft/go-winio v0.6.2 // indirect
2733
github.com/andybalholm/brotli v1.1.1 // indirect
34+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect
35+
github.com/aws/aws-sdk-go-v2/credentials v1.18.4 // indirect
36+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.3 // indirect
37+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 // indirect
38+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 // indirect
39+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
40+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.3 // indirect
41+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect
42+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.3 // indirect
43+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 // indirect
44+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.3 // indirect
45+
github.com/aws/aws-sdk-go-v2/service/sso v1.28.0 // indirect
46+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0 // indirect
47+
github.com/aws/aws-sdk-go-v2/service/sts v1.37.0 // indirect
48+
github.com/aws/smithy-go v1.22.5 // indirect
2849
github.com/beorn7/perks v1.0.1 // indirect
2950
github.com/bits-and-blooms/bitset v1.20.0 // indirect
3051
github.com/bytedance/sonic v1.12.6 // indirect
@@ -39,13 +60,18 @@ require (
3960
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4061
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
4162
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
63+
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
64+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
65+
github.com/dustin/go-humanize v1.0.1 // indirect
4266
github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect
4367
github.com/ethereum/go-verkle v0.2.2 // indirect
4468
github.com/fsnotify/fsnotify v1.7.0 // indirect
4569
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
4670
github.com/gin-contrib/sse v0.1.0 // indirect
4771
github.com/go-faster/city v1.0.1 // indirect
4872
github.com/go-faster/errors v0.7.1 // indirect
73+
github.com/go-logr/logr v1.4.3 // indirect
74+
github.com/go-logr/stdr v1.2.2 // indirect
4975
github.com/go-ole/go-ole v1.3.0 // indirect
5076
github.com/go-openapi/jsonpointer v0.21.0 // indirect
5177
github.com/go-openapi/jsonreference v0.21.0 // indirect
@@ -57,6 +83,7 @@ require (
5783
github.com/goccy/go-json v0.10.4 // indirect
5884
github.com/gofrs/flock v0.8.1 // indirect
5985
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
86+
github.com/google/flatbuffers v25.2.10+incompatible // indirect
6087
github.com/google/uuid v1.6.0 // indirect
6188
github.com/gorilla/websocket v1.4.2 // indirect
6289
github.com/hashicorp/hcl v1.0.0 // indirect
@@ -94,7 +121,7 @@ require (
94121
github.com/sourcegraph/conc v0.3.0 // indirect
95122
github.com/spf13/afero v1.11.0 // indirect
96123
github.com/spf13/cast v1.6.0 // indirect
97-
github.com/spf13/pflag v1.0.5 // indirect
124+
github.com/spf13/pflag v1.0.6 // indirect
98125
github.com/stretchr/objx v0.5.2 // indirect
99126
github.com/subosito/gotenv v1.6.0 // indirect
100127
github.com/supranational/blst v0.3.14 // indirect
@@ -104,18 +131,20 @@ require (
104131
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
105132
github.com/ugorji/go/codec v1.2.12 // indirect
106133
github.com/yusufpapurcu/wmi v1.2.4 // indirect
107-
go.opentelemetry.io/otel v1.36.0 // indirect
108-
go.opentelemetry.io/otel/trace v1.36.0 // indirect
134+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
135+
go.opentelemetry.io/otel v1.37.0 // indirect
136+
go.opentelemetry.io/otel/metric v1.37.0 // indirect
137+
go.opentelemetry.io/otel/trace v1.37.0 // indirect
109138
go.uber.org/multierr v1.11.0 // indirect
110139
golang.org/x/arch v0.12.0 // indirect
111-
golang.org/x/crypto v0.38.0 // indirect
140+
golang.org/x/crypto v0.39.0 // indirect
112141
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
113-
golang.org/x/net v0.40.0 // indirect
114-
golang.org/x/sync v0.14.0 // indirect
115-
golang.org/x/sys v0.33.0 // indirect
116-
golang.org/x/text v0.25.0 // indirect
117-
golang.org/x/tools v0.30.0 // indirect
118-
google.golang.org/protobuf v1.36.1 // indirect
142+
golang.org/x/net v0.41.0 // indirect
143+
golang.org/x/sync v0.15.0 // indirect
144+
golang.org/x/sys v0.34.0 // indirect
145+
golang.org/x/text v0.26.0 // indirect
146+
golang.org/x/tools v0.33.0 // indirect
147+
google.golang.org/protobuf v1.36.6 // indirect
119148
gopkg.in/ini.v1 v1.67.0 // indirect
120149
gopkg.in/yaml.v3 v3.0.1 // indirect
121150
rsc.io/tmplfunc v0.0.3 // indirect

0 commit comments

Comments
 (0)