Skip to content

Commit fa3f0e2

Browse files
authored
fix(syncing): skip draining when exec client unavailable (#3060)
* fix(syncing): skip draining when exec client unavailable * add tests * use synctest whenever useful * cl * fixes * cleanup
1 parent dd87ec9 commit fa3f0e2

File tree

8 files changed

+659
-429
lines changed

8 files changed

+659
-429
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased]
1111

12+
### Changes
13+
14+
- Skip draining when exec client unavailable. [#3060](https://github.com/evstack/ev-node/pull/3060)
15+
1216
## v1.0.0-rc.3
1317

1418
### Added

block/components_test.go

Lines changed: 100 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
crand "crypto/rand"
66
"errors"
77
"testing"
8+
"testing/synctest"
89
"time"
910

1011
"github.com/ipfs/go-datastore"
@@ -189,101 +190,103 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
189190
func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
190191
// This test verifies that when the executor's execution client calls fail,
191192
// the error is properly propagated through the error channel and stops the node
192-
193-
ds := sync.MutexWrap(datastore.NewMapDatastore())
194-
memStore := store.New(ds)
195-
196-
cfg := config.DefaultConfig()
197-
cfg.Node.BlockTime.Duration = 50 * time.Millisecond // Fast for testing
198-
199-
// Create test signer
200-
priv, _, err := crypto.GenerateEd25519Key(crand.Reader)
201-
require.NoError(t, err)
202-
testSigner, err := noop.NewNoopSigner(priv)
203-
require.NoError(t, err)
204-
addr, err := testSigner.GetAddress()
205-
require.NoError(t, err)
206-
207-
gen := genesis.Genesis{
208-
ChainID: "test-chain",
209-
InitialHeight: 1,
210-
StartTime: time.Now().Add(-time.Second), // Start in past to trigger immediate execution
211-
ProposerAddress: addr,
212-
}
213-
214-
// Create mock executor that will fail on ExecuteTxs
215-
mockExec := testmocks.NewMockExecutor(t)
216-
mockSeq := testmocks.NewMockSequencer(t)
217-
daClient := testmocks.NewMockClient(t)
218-
daClient.On("GetHeaderNamespace").Return(datypes.NamespaceFromString("ns").Bytes()).Maybe()
219-
daClient.On("GetDataNamespace").Return(datypes.NamespaceFromString("data-ns").Bytes()).Maybe()
220-
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
221-
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()
222-
223-
// Mock InitChain to succeed initially
224-
mockExec.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
225-
Return([]byte("state-root"), nil).Once()
226-
227-
// Mock SetDAHeight to be called during initialization
228-
mockSeq.On("SetDAHeight", uint64(0)).Return().Once()
229-
230-
// Mock GetNextBatch to return empty batch
231-
mockSeq.On("GetNextBatch", mock.Anything, mock.Anything).
232-
Return(&coresequencer.GetNextBatchResponse{
233-
Batch: &coresequencer.Batch{Transactions: nil},
234-
Timestamp: time.Now(),
235-
}, nil).Maybe()
236-
237-
// Mock GetTxs for reaper (return empty to avoid interfering with test)
238-
mockExec.On("GetTxs", mock.Anything).
239-
Return([][]byte{}, nil).Maybe()
240-
241-
// Mock ExecuteTxs to fail with a critical error
242-
criticalError := errors.New("execution client RPC connection failed")
243-
mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
244-
Return(nil, criticalError).Maybe()
245-
246-
// Create aggregator node
247-
components, err := NewAggregatorComponents(
248-
cfg,
249-
gen,
250-
memStore,
251-
mockExec,
252-
mockSeq,
253-
daClient,
254-
testSigner,
255-
nil, // header broadcaster
256-
nil, // data broadcaster
257-
zerolog.Nop(),
258-
NopMetrics(),
259-
DefaultBlockOptions(),
260-
nil,
261-
)
262-
require.NoError(t, err)
263-
264-
// Start should return with error when execution client fails
265-
// Timeout accounts for retry delays: 3 retries × 10s timeout = ~30s plus buffer
266-
ctx, cancel := context.WithTimeout(context.Background(), 35*time.Second)
267-
defer cancel()
268-
269-
// Run Start in a goroutine to handle the blocking call
270-
startErrCh := make(chan error, 1)
271-
go func() {
272-
startErrCh <- components.Start(ctx)
273-
}()
274-
275-
// Wait for either the error or timeout
276-
select {
277-
case err = <-startErrCh:
278-
// We expect an error containing the critical execution client failure
279-
require.Error(t, err)
280-
assert.Contains(t, err.Error(), "critical execution client failure")
281-
assert.Contains(t, err.Error(), "execution client RPC connection failed")
282-
case <-ctx.Done():
283-
t.Fatal("timeout waiting for critical error to propagate")
284-
}
285-
286-
// Clean up
287-
stopErr := components.Stop()
288-
assert.NoError(t, stopErr)
193+
synctest.Test(t, func(t *testing.T) {
194+
ds := sync.MutexWrap(datastore.NewMapDatastore())
195+
memStore := store.New(ds)
196+
197+
cfg := config.DefaultConfig()
198+
cfg.Node.BlockTime.Duration = 50 * time.Millisecond // Fast for testing
199+
200+
// Create test signer
201+
priv, _, err := crypto.GenerateEd25519Key(crand.Reader)
202+
require.NoError(t, err)
203+
testSigner, err := noop.NewNoopSigner(priv)
204+
require.NoError(t, err)
205+
addr, err := testSigner.GetAddress()
206+
require.NoError(t, err)
207+
208+
gen := genesis.Genesis{
209+
ChainID: "test-chain",
210+
InitialHeight: 1,
211+
StartTime: time.Now().Add(-time.Second), // Start in past to trigger immediate execution
212+
ProposerAddress: addr,
213+
}
214+
215+
// Create mock executor that will fail on ExecuteTxs
216+
mockExec := testmocks.NewMockExecutor(t)
217+
mockSeq := testmocks.NewMockSequencer(t)
218+
daClient := testmocks.NewMockClient(t)
219+
daClient.On("GetHeaderNamespace").Return(datypes.NamespaceFromString("ns").Bytes()).Maybe()
220+
daClient.On("GetDataNamespace").Return(datypes.NamespaceFromString("data-ns").Bytes()).Maybe()
221+
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
222+
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()
223+
224+
// Mock InitChain to succeed initially
225+
mockExec.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
226+
Return([]byte("state-root"), nil).Once()
227+
228+
// Mock SetDAHeight to be called during initialization
229+
mockSeq.On("SetDAHeight", uint64(0)).Return().Once()
230+
231+
// Mock GetNextBatch to return empty batch
232+
mockSeq.On("GetNextBatch", mock.Anything, mock.Anything).
233+
Return(&coresequencer.GetNextBatchResponse{
234+
Batch: &coresequencer.Batch{Transactions: nil},
235+
Timestamp: time.Now(),
236+
}, nil).Maybe()
237+
238+
// Mock GetTxs for reaper (return empty to avoid interfering with test)
239+
mockExec.On("GetTxs", mock.Anything).
240+
Return([][]byte{}, nil).Maybe()
241+
242+
// Mock ExecuteTxs to fail with a critical error
243+
criticalError := errors.New("execution client RPC connection failed")
244+
mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
245+
Return(nil, criticalError).Maybe()
246+
247+
// Create aggregator node
248+
components, err := NewAggregatorComponents(
249+
cfg,
250+
gen,
251+
memStore,
252+
mockExec,
253+
mockSeq,
254+
daClient,
255+
testSigner,
256+
nil, // header broadcaster
257+
nil, // data broadcaster
258+
zerolog.Nop(),
259+
NopMetrics(),
260+
DefaultBlockOptions(),
261+
nil,
262+
)
263+
require.NoError(t, err)
264+
265+
// Start should return with error when execution client fails.
266+
// With synctest the fake clock advances the retry delays instantly.
267+
ctx, cancel := context.WithTimeout(t.Context(), 35*time.Second)
268+
defer cancel()
269+
270+
// Run Start in a goroutine to handle the blocking call
271+
startErrCh := make(chan error, 1)
272+
go func() {
273+
startErrCh <- components.Start(ctx)
274+
}()
275+
276+
// Wait for either the error or timeout
277+
synctest.Wait()
278+
select {
279+
case err = <-startErrCh:
280+
// We expect an error containing the critical execution client failure
281+
require.Error(t, err)
282+
assert.Contains(t, err.Error(), "critical execution client failure")
283+
assert.Contains(t, err.Error(), "execution client RPC connection failed")
284+
case <-ctx.Done():
285+
t.Fatal("timeout waiting for critical error to propagate")
286+
}
287+
288+
// Clean up
289+
stopErr := components.Stop()
290+
assert.NoError(t, stopErr)
291+
})
289292
}

block/internal/executing/executor_logic_test.go

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
crand "crypto/rand"
66
"errors"
77
"testing"
8+
"testing/synctest"
89
"time"
910

1011
"github.com/ipfs/go-datastore"
@@ -281,49 +282,50 @@ func TestExecutor_executeTxsWithRetry(t *testing.T) {
281282
for _, tt := range tests {
282283
t.Run(tt.name, func(t *testing.T) {
283284
t.Parallel()
285+
synctest.Test(t, func(t *testing.T) {
286+
ctx := context.Background()
287+
execCtx := ctx
288+
289+
// For context cancellation test, create a cancellable context
290+
if tt.name == "context cancelled during retry" {
291+
var cancel context.CancelFunc
292+
execCtx, cancel = context.WithCancel(ctx)
293+
// Cancel context after first failure to simulate cancellation during retry
294+
go func() {
295+
time.Sleep(100 * time.Millisecond)
296+
cancel()
297+
}()
298+
}
299+
300+
mockExec := testmocks.NewMockExecutor(t)
301+
tt.setupMock(mockExec)
284302

285-
ctx := context.Background()
286-
execCtx := ctx
287-
288-
// For context cancellation test, create a cancellable context
289-
if tt.name == "context cancelled during retry" {
290-
var cancel context.CancelFunc
291-
execCtx, cancel = context.WithCancel(ctx)
292-
// Cancel context after first failure to simulate cancellation during retry
293-
go func() {
294-
time.Sleep(100 * time.Millisecond)
295-
cancel()
296-
}()
297-
}
298-
299-
mockExec := testmocks.NewMockExecutor(t)
300-
tt.setupMock(mockExec)
301-
302-
e := &Executor{
303-
exec: mockExec,
304-
ctx: execCtx,
305-
logger: zerolog.Nop(),
306-
}
307-
308-
rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")}
309-
header := types.Header{
310-
BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())},
311-
}
312-
currentState := types.State{AppHash: []byte("current-hash")}
313-
314-
result, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState)
315-
316-
if tt.expectSuccess {
317-
require.NoError(t, err)
318-
assert.Equal(t, tt.expectHash, result)
319-
} else {
320-
require.Error(t, err)
321-
if tt.expectError != "" {
322-
assert.Contains(t, err.Error(), tt.expectError)
303+
e := &Executor{
304+
exec: mockExec,
305+
ctx: execCtx,
306+
logger: zerolog.Nop(),
307+
}
308+
309+
rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")}
310+
header := types.Header{
311+
BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())},
312+
}
313+
currentState := types.State{AppHash: []byte("current-hash")}
314+
315+
result, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState)
316+
317+
if tt.expectSuccess {
318+
require.NoError(t, err)
319+
assert.Equal(t, tt.expectHash, result)
320+
} else {
321+
require.Error(t, err)
322+
if tt.expectError != "" {
323+
assert.Contains(t, err.Error(), tt.expectError)
324+
}
323325
}
324-
}
325326

326-
mockExec.AssertExpectations(t)
327+
mockExec.AssertExpectations(t)
328+
})
327329
})
328330
}
329331
}

block/internal/submitting/submitter_test.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"sync/atomic"
99
"testing"
10+
"testing/synctest"
1011
"time"
1112

1213
"github.com/ipfs/go-datastore"
@@ -77,29 +78,30 @@ func TestSubmitter_setFinalWithRetry(t *testing.T) {
7778
for _, tt := range tests {
7879
t.Run(tt.name, func(t *testing.T) {
7980
t.Parallel()
81+
synctest.Test(t, func(t *testing.T) {
82+
ctx := context.Background()
83+
exec := testmocks.NewMockExecutor(t)
84+
tt.setupMock(exec)
85+
86+
s := &Submitter{
87+
exec: exec,
88+
ctx: ctx,
89+
logger: zerolog.Nop(),
90+
}
8091

81-
ctx := context.Background()
82-
exec := testmocks.NewMockExecutor(t)
83-
tt.setupMock(exec)
84-
85-
s := &Submitter{
86-
exec: exec,
87-
ctx: ctx,
88-
logger: zerolog.Nop(),
89-
}
90-
91-
err := s.setFinalWithRetry(100)
92+
err := s.setFinalWithRetry(100)
9293

93-
if tt.expectSuccess {
94-
require.NoError(t, err)
95-
} else {
96-
require.Error(t, err)
97-
if tt.expectError != "" {
98-
assert.Contains(t, err.Error(), tt.expectError)
94+
if tt.expectSuccess {
95+
require.NoError(t, err)
96+
} else {
97+
require.Error(t, err)
98+
if tt.expectError != "" {
99+
assert.Contains(t, err.Error(), tt.expectError)
100+
}
99101
}
100-
}
101102

102-
exec.AssertExpectations(t)
103+
exec.AssertExpectations(t)
104+
})
103105
})
104106
}
105107
}

0 commit comments

Comments
 (0)