Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ func RunCommitter(cmd *cobra.Command, args []string) {
committer.Init()
committer.InitReorg()

go committer.RunReorgValidator()
committer.CommitStreaming()
committer.RunReorgValidator()
// committer.CommitStreaming()
}
67 changes: 59 additions & 8 deletions internal/committer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func RunReorgValidator() {
continue
}

if endBlock == lastBlockCheck || endBlock-startBlock < 100 {
log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.")
if endBlock-startBlock < 100 {
log.Debug().Int64("last_block_check", lastBlockCheck).Int64("start_block", startBlock).Int64("end_block", endBlock).Msg("Not enough new blocks to check. Sleeping for 1 minute.")
time.Sleep(1 * time.Minute)
continue
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
return nil
}

// finding the reorg start and end block
// 1) Block verification: find reorg range from header continuity (existing behavior)
reorgStartBlock := int64(-1)
reorgEndBlock := int64(-1)
for i := 1; i < len(blockHeaders); i++ {
Expand All @@ -131,18 +131,69 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
}

// set end to the last block if not set
lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64()
if reorgEndBlock == -1 {
reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64()
// No header-based end detected; default to the last header for last-valid-block tracking.
reorgEndBlock = lastHeaderBlock
}

// 2) Transaction verification: check for mismatches between block.transaction_count
// and the number of transactions stored per block in ClickHouse.
txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err)
}

// 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse.
logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err)
}

// 4) Combine all ranges:
// - If all three ranges (blocks, tx, logs) are empty, then there is no reorg.
// - Otherwise, take min(start) and max(end) across all non-empty ranges as the final reorg range.
finalStart := int64(-1)
finalEnd := int64(-1)

// block headers range
if reorgStartBlock > -1 {
if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil {
return err
finalStart = reorgStartBlock
finalEnd = reorgEndBlock
}

// transactions range
if txStart > -1 {
if finalStart == -1 || txStart < finalStart {
finalStart = txStart
}
if finalEnd == -1 || txEnd > finalEnd {
finalEnd = txEnd
}
}

// logs range
if logsStart > -1 {
if finalStart == -1 || logsStart < finalStart {
finalStart = logsStart
}
if finalEnd == -1 || logsEnd > finalEnd {
finalEnd = logsEnd
}
}

// update last valid block. if there was no reorg, this will update to the last block
libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock)
lastValidBlock := lastHeaderBlock
if finalStart > -1 {
// We found at least one inconsistent range; reorg from min(start) to max(end).
if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil {
return err
}
lastValidBlock = finalEnd
}
err = libs.SetReorgLastValidBlock(libs.ChainIdStr, lastValidBlock)
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err)
}

return nil
}
Expand Down
169 changes: 167 additions & 2 deletions internal/libs/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ var defaultTraceFields = []string{
"reward_type", "refund_address",
}

type blockTxAggregate struct {
BlockNumber *big.Int `ch:"block_number"`
TxCount uint64 `ch:"tx_count"`
}

type blockLogAggregate struct {
BlockNumber *big.Int `ch:"block_number"`
LogCount uint64 `ch:"log_count"`
MaxLogIndex uint64 `ch:"max_log_index"`
}

// only use this for backfill or getting old data.
var ClickhouseConnV1 clickhouse.Conn

Expand Down Expand Up @@ -232,14 +243,12 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
if blocksRaw[i].TransactionCount != uint64(len(transactionsRaw[i])) {
log.Info().
Any("transactionCount", blocksRaw[i].TransactionCount).
Any("transactionsRaw", transactionsRaw[i]).
Msg("skipping block because transactionCount does not match")
continue
}
if (blocksRaw[i].LogsBloom != "" && blocksRaw[i].LogsBloom != EMPTY_LOGS_BLOOM) && len(logsRaw[i]) == 0 {
log.Info().
Any("logsBloom", blocksRaw[i].LogsBloom).
Any("logsRaw", logsRaw[i]).
Msg("skipping block because logsBloom is not empty and logsRaw is empty")
continue
}
Expand All @@ -253,6 +262,162 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
return blockData, nil
}

// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
// where the stored transaction_count in the blocks table does not match the number
// of transactions in the transactions table. It returns the minimum and maximum
// block numbers that have a mismatch, or (-1, -1) if all blocks are consistent.
func GetTransactionMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
if endBlockNumber < startBlockNumber {
return -1, -1, nil
}

blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
if err != nil {
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
}

// Aggregate transaction counts per block from the transactions table.
query := fmt.Sprintf(
"SELECT block_number, count() AS tx_count FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)

txAggRows, err := execQueryV2[blockTxAggregate](query)
if err != nil {
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load tx aggregates: %w", err)
}

txCounts := make(map[uint64]uint64, len(txAggRows))
for _, row := range txAggRows {
if row.BlockNumber == nil {
continue
}
txCounts[row.BlockNumber.Uint64()] = row.TxCount
}

var mismatchStart int64 = -1
var mismatchEnd int64 = -1

for _, block := range blocksRaw {
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
continue
}

bn := block.Number.Uint64()
expectedTxCount := block.TransactionCount
actualTxCount, hasTx := txCounts[bn]

mismatch := false
if expectedTxCount == 0 {
// Header says no transactions; ensure there are none in the table.
if hasTx && actualTxCount > 0 {
mismatch = true
}
} else {
// Header says there should be transactions.
if !hasTx || actualTxCount != expectedTxCount {
mismatch = true
}
}

if mismatch {
if mismatchStart == -1 || int64(bn) < mismatchStart {
mismatchStart = int64(bn)
}
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
mismatchEnd = int64(bn)
}
}
}

return mismatchStart, mismatchEnd, nil
}

// GetLogsMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
// where logs in the logs table are inconsistent with the block's logs_bloom:
// - logsBloom is non-empty but there are no logs for that block
// - logsBloom is empty/zero but logs exist
// - log indexes are not contiguous (count(*) != max(log_index)+1 when logs exist)
// It returns the minimum and maximum block numbers that have a mismatch, or
// (-1, -1) if all blocks are consistent.
func GetLogsMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
if endBlockNumber < startBlockNumber {
return -1, -1, nil
}

blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
if err != nil {
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
}

// Aggregate log counts and max log_index per block from the logs table.
query := fmt.Sprintf(
"SELECT block_number, count() AS log_count, max(log_index) AS max_log_index FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)

logAggRows, err := execQueryV2[blockLogAggregate](query)
if err != nil {
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load log aggregates: %w", err)
}

logAggs := make(map[uint64]blockLogAggregate, len(logAggRows))
for _, row := range logAggRows {
if row.BlockNumber == nil {
continue
}
bn := row.BlockNumber.Uint64()
logAggs[bn] = row
}

var mismatchStart int64 = -1
var mismatchEnd int64 = -1

for _, block := range blocksRaw {
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
continue
}

bn := block.Number.Uint64()
hasLogsBloom := block.LogsBloom != "" && block.LogsBloom != EMPTY_LOGS_BLOOM
logAgg, hasLogAgg := logAggs[bn]

mismatch := false

if hasLogsBloom {
// logsBloom indicates logs should exist
if !hasLogAgg || logAgg.LogCount == 0 {
mismatch = true
} else if logAgg.MaxLogIndex+1 != logAgg.LogCount {
// log_index should be contiguous from 0..log_count-1
mismatch = true
}
} else {
// logsBloom is empty/zero; there should be no logs
if hasLogAgg && logAgg.LogCount > 0 {
mismatch = true
}
}

if mismatch {
if mismatchStart == -1 || int64(bn) < mismatchStart {
mismatchStart = int64(bn)
}
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
mismatchEnd = int64(bn)
}
}
}

return mismatchStart, mismatchEnd, nil
}

func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
Expand Down
2 changes: 1 addition & 1 deletion internal/libs/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var RedisClient *redis.Client

const RedisReorgLastValidBlock = "reorg_last_valid"
const RedisReorgLastValidBlock = "reorg_last_valid_debug"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Debug suffix in Redis key appears unintentional for production.

The _debug suffix suggests this is temporary test code. If merged to main, this will:

  1. Isolate this deployment from previously stored reorg_last_valid data
  2. Leave a debug artifact in production

If this is intentional for testing the new reorg consolidation, consider removing the suffix before merge or documenting why isolation is needed.

🤖 Prompt for AI Agents
In internal/libs/redis.go around line 15 the constant RedisReorgLastValidBlock
is set to "reorg_last_valid_debug", which leaves a debug artifact and isolates
this deployment from prior data; change the constant value to the production key
"reorg_last_valid" (or "reorg_last_valid_block" if that matches existing
scheme), update all usages across the codebase to use the new constant
(search-and-replace symbol or key string), and either remove the `_debug` suffix
or add a clear comment and documented migration/backfill plan if isolation is
intentional before merging.


// InitRedis initializes the Redis client
func InitRedis() {
Expand Down
56 changes: 14 additions & 42 deletions internal/storage/kafka_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,6 @@ func (p *KafkaPublisher) PublishBlockDataReorg(newBlockData []*common.BlockData,
return nil
}

func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*common.BlockData) error {
chainId := newData[0].Block.ChainId.Uint64()
newHead := uint64(newData[0].Block.Number.Uint64())
// Publish revert the revert to the new head - 1, so that the new updated block data can be re-processed
if err := p.publishBlockRevert(chainId, newHead-1); err != nil {
return fmt.Errorf("failed to revert: %v", err)
}

if err := p.publishBlockData(oldData, true, true); err != nil {
return fmt.Errorf("failed to publish old block data: %v", err)
}

if err := p.publishBlockData(newData, false, true); err != nil {
return fmt.Errorf("failed to publish new block data: %v", err)
}
return nil
}

func (p *KafkaPublisher) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -219,27 +201,6 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
return nil
}

func (p *KafkaPublisher) publishBlockRevert(chainId uint64, blockNumber uint64) error {
publishStart := time.Now()

// Prepare messages for blocks, events, transactions and traces
blockMessages := make([]*kgo.Record, 1)

// Block message
if blockMsg, err := p.createBlockRevertMessage(chainId, blockNumber); err == nil {
blockMessages[0] = blockMsg
} else {
return fmt.Errorf("failed to create block revert message: %v", err)
}

if err := p.publishMessages(context.Background(), blockMessages); err != nil {
return fmt.Errorf("failed to publish block revert messages: %v", err)
}

log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
return nil
}

func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDeleted bool, isReorg bool) error {
if len(blockData) == 0 {
return nil
Expand Down Expand Up @@ -270,9 +231,10 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet
return nil
}

if err := p.publishMessages(context.Background(), blockMessages); err != nil {
return fmt.Errorf("failed to publish block messages: %v", err)
}
// test code, uncomment later
// if err := p.publishMessages(context.Background(), blockMessages); err != nil {
// return fmt.Errorf("failed to publish block messages: %v", err)
// }

log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
return nil
Expand Down Expand Up @@ -306,6 +268,16 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet
return nil, fmt.Errorf("failed to marshal block data: %v", err)
}

log.Debug().
Uint64("chain_id", data.ChainId).
Uint64("block_number", block.Block.Number.Uint64()).
Int("tx_count", len(block.Transactions)).
Int("log_count", len(block.Logs)).
Int("trace_count", len(block.Traces)).
Bool("is_deleted", isDeleted).
Bool("is_reorg", isReorg).
Msg("KafkaPublisher Message: Block metadata")

return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson)
}

Expand Down