Skip to content

Commit 0168ca2

Browse files
authored
committer start block (#312)
* committer start block * minor change
1 parent e8fa8b5 commit 0168ca2

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

configs/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ type Config struct {
6262
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
6363
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
6464
CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"`
65+
// CommitterStartBlock, when set (>0), forces the committer to start publishing
66+
// from this block number regardless of what ClickHouse says is already committed.
67+
// This can cause duplicate publishing if ClickHouse already contains higher blocks.
68+
CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"`
6569
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
6670
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
6771
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`

internal/committer/committer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,20 @@ func getLastTrackedBlockNumberAndBlockRangesFromS3() (int64, []types.BlockRange,
104104
}
105105
log.Debug().Int64("max_block_number", maxBlockNumber).Msg("Retrieved max block number from ClickHouse.(-1 means nothing committed yet, start from 0)")
106106

107+
// Optional override: force the committer to start from a specific block number.
108+
// We implement this by pretending ClickHouse max is (startBlock - 1), so both S3
109+
// range scanning and live RPC polling begin at startBlock.
110+
if config.Cfg.CommitterStartBlock > 0 {
111+
overrideMax := int64(config.Cfg.CommitterStartBlock) - 1
112+
if maxBlockNumber < overrideMax {
113+
maxBlockNumber = overrideMax
114+
log.Info().
115+
Int64("clickhouse_max_block", maxBlockNumber).
116+
Uint64("override_start_block", config.Cfg.CommitterStartBlock).
117+
Msg("CommitterStartBlock override enabled; starting earlier than ClickHouse cursor")
118+
}
119+
}
120+
107121
blockRanges, err := libs.GetBlockRangesFromS3(maxBlockNumber)
108122
if err != nil {
109123
log.Error().Err(err).Msg("Failed to get block ranges from S3")

0 commit comments

Comments
 (0)