Skip to content

Commit fdf5900

Browse files
authored
Merge pull request #70 from libi/develop
millstone: new breaking change v0.5.1, from develop to master.
2 parents 04bd2b2 + e10900e commit fdf5900

12 files changed

Lines changed: 345 additions & 32 deletions

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ name: Testing
22

33
on:
44
push:
5-
branches: [ "master" ]
5+
branches: [ "master", "develop" ]
66
pull_request:
7-
branches: [ "master" ]
7+
branches: [ "master", "develop" ]
88

99
jobs:
1010

dcron.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ import (
1717
const (
1818
defaultReplicas = 50
1919
defaultDuration = 3 * time.Second
20-
)
2120

22-
const (
2321
dcronRunning = 1
2422
dcronStopped = 0
23+
24+
dcronStateSteady = "dcronStateSteady"
25+
dcronStateUpgrade = "dcronStateUpgrade"
2526
)
2627

2728
type RecoverFuncType func(d *Dcron)
@@ -45,6 +46,9 @@ type Dcron struct {
4546
crOptions []cron.Option
4647

4748
RecoverFunc RecoverFuncType
49+
50+
recentJobs IRecentJobPacker
51+
state atomic.Value
4852
}
4953

5054
// NewDcron create a Dcron
@@ -66,7 +70,6 @@ func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...
6670

6771
dcron.cr = cron.New(dcron.crOptions...)
6872
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
69-
7073
return dcron
7174
}
7275

@@ -137,8 +140,24 @@ func (d *Dcron) Remove(jobName string) {
137140
}
138141
}
139142

140-
func (d *Dcron) allowThisNodeRun(jobName string) bool {
141-
return d.nodePool.CheckJobAvailable(jobName)
143+
func (d *Dcron) allowThisNodeRun(jobName string) (ok bool) {
144+
ok, err := d.nodePool.CheckJobAvailable(jobName)
145+
if err != nil {
146+
d.logger.Errorf("allow this node run error, err=%v", err)
147+
ok = false
148+
d.state.Store(dcronStateUpgrade)
149+
} else {
150+
d.state.Store(dcronStateSteady)
151+
if d.recentJobs != nil {
152+
go d.reRunRecentJobs(d.recentJobs.PopAllJobs())
153+
}
154+
}
155+
if d.recentJobs != nil {
156+
if d.state.Load().(string) == dcronStateUpgrade {
157+
d.recentJobs.AddJob(jobName, time.Now())
158+
}
159+
}
160+
return
142161
}
143162

144163
// Start job
@@ -147,14 +166,13 @@ func (d *Dcron) Start() {
147166
if d.RecoverFunc != nil {
148167
d.RecoverFunc(d)
149168
}
150-
151169
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
152170
if err := d.startNodePool(); err != nil {
153171
atomic.StoreInt32(&d.running, dcronStopped)
154172
return
155173
}
156174
d.cr.Start()
157-
d.logger.Infof("dcron started , nodeID is %s", d.nodePool.GetNodeID())
175+
d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID())
158176
} else {
159177
d.logger.Infof("dcron have started")
160178
}
@@ -171,8 +189,7 @@ func (d *Dcron) Run() {
171189
atomic.StoreInt32(&d.running, dcronStopped)
172190
return
173191
}
174-
175-
d.logger.Infof("dcron running nodeID is %s", d.nodePool.GetNodeID())
192+
d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID())
176193
d.cr.Run()
177194
} else {
178195
d.logger.Infof("dcron already running")
@@ -199,3 +216,18 @@ func (d *Dcron) Stop() {
199216
}
200217
}
201218
}
219+
220+
func (d *Dcron) reRunRecentJobs(jobNames []string) {
221+
d.logger.Infof("reRunRecentJobs: length=%d", len(jobNames))
222+
for _, jobName := range jobNames {
223+
if job, ok := d.jobs[jobName]; ok {
224+
if ok, _ := d.nodePool.CheckJobAvailable(jobName); ok {
225+
job.Execute()
226+
}
227+
}
228+
}
229+
}
230+
231+
func (d *Dcron) NodeID() string {
232+
return d.nodePool.GetNodeID()
233+
}

dcron_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,50 @@ func Test_SecondJobWithStopAndSwapNode(t *testing.T) {
197197
wg.Wait()
198198
}
199199

200+
func Test_WithClusterStableNodes(t *testing.T) {
201+
wg := &sync.WaitGroup{}
202+
wg.Add(5)
203+
204+
runningTime := 60 * time.Second
205+
startFunc := func(id string, timeWindow time.Duration, t *testing.T) {
206+
redisCli := redis.NewClient(&redis.Options{
207+
Addr: DefaultRedisAddr,
208+
})
209+
drv := driver.NewRedisDriver(redisCli)
210+
dcr := dcron.NewDcronWithOption(t.Name(), drv,
211+
dcron.CronOptionSeconds(),
212+
dcron.WithLogger(&dlog.StdLogger{
213+
Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags),
214+
}),
215+
dcron.WithClusterStable(timeWindow),
216+
dcron.WithNodeUpdateDuration(timeWindow),
217+
)
218+
var err error
219+
err = dcr.AddFunc("job1", "*/3 * * * * *", func() {
220+
t.Log(time.Now())
221+
})
222+
require.Nil(t, err)
223+
err = dcr.AddFunc("job2", "*/8 * * * * *", func() {
224+
t.Logf("job2: %v", time.Now())
225+
})
226+
require.Nil(t, err)
227+
err = dcr.AddFunc("job3", "* * * * * *", func() {
228+
t.Log("job3:", time.Now())
229+
})
230+
require.Nil(t, err)
231+
dcr.Start()
232+
<-time.After(runningTime)
233+
dcr.Stop()
234+
wg.Done()
235+
}
236+
237+
go startFunc("1", time.Second*6, t)
238+
go startFunc("2", time.Second*6, t)
239+
go startFunc("3", time.Second*6, t)
240+
go startFunc("4", time.Second*6, t)
241+
go startFunc("5", time.Second*6, t)
242+
}
243+
200244
func Test_SecondJobLog_Issue68(t *testing.T) {
201245
wg := &sync.WaitGroup{}
202246
wg.Add(5)

driver/etcddriver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,15 @@ func (e *EtcdDriver) keepAlive(ctx context.Context, nodeID string) (<-chan *clie
134134
func (e *EtcdDriver) revoke(ctx context.Context) {
135135
_, err := e.cli.Lease.Revoke(ctx, e.leaseID)
136136
if err != nil {
137-
e.logger.Printf("lease revoke error: %v", err)
137+
e.logger.Infof("lease revoke error: %v", err)
138138
}
139139
}
140140

141141
func (e *EtcdDriver) heartBeat(ctx context.Context) {
142142
label:
143143
leaseCh, err := e.keepAlive(ctx, e.nodeID)
144144
if err != nil {
145+
e.logger.Errorf("keep alive error, %v", err)
145146
return
146147
}
147148
for {

inodepool.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
package dcron
22

3-
import "context"
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
)
8+
9+
var (
10+
ErrNodePoolIsUpgrading = errors.New("nodePool is upgrading")
11+
)
412

513
type INodePool interface {
614
Start(ctx context.Context) error
7-
CheckJobAvailable(jobName string) bool
15+
CheckJobAvailable(jobName string) (bool, error)
816
Stop(ctx context.Context) error
917

1018
GetNodeID() string
19+
GetLastNodesUpdateTime() time.Time
1120
}

inodepool_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,7 @@ func (ts *TestINodePoolSuite) declareRedisZSetDrivers(clients *[]*redis.Client,
8181
func (ts *TestINodePoolSuite) runCheckJobAvailable(numberOfNodes int, ServiceName string, nodePools *[]dcron.INodePool, updateDuration time.Duration) {
8282
for i := 0; i < numberOfNodes; i++ {
8383
err := (*nodePools)[i].Start(context.Background())
84-
if err != nil {
85-
ts.T().Fail()
86-
}
84+
ts.Require().Nil(err)
8785
}
8886
<-time.After(updateDuration * 2)
8987
ring := consistenthash.New(ts.defaultHashReplicas, nil)
@@ -93,13 +91,14 @@ func (ts *TestINodePoolSuite) runCheckJobAvailable(numberOfNodes int, ServiceNam
9391

9492
for i := 0; i < 10000; i++ {
9593
for j := 0; j < numberOfNodes; j++ {
94+
ok, err := (*nodePools)[j].CheckJobAvailable(strconv.Itoa(i))
95+
ts.Require().Nil(err)
9696
ts.Require().Equal(
97-
(*nodePools)[j].CheckJobAvailable(strconv.Itoa(i)),
97+
ok,
9898
(ring.Get(strconv.Itoa(i)) == (*nodePools)[j].GetNodeID()),
9999
)
100100
}
101101
}
102-
103102
}
104103

105104
func (ts *TestINodePoolSuite) TestMultiNodesRedis() {
@@ -143,7 +142,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesRedisZSet() {
143142
var nodePools []dcron.INodePool
144143

145144
numberOfNodes := 5
146-
ServiceName := "TestMultiNodesEtcd"
145+
ServiceName := "TestMultiNodesZSet"
147146
updateDuration := 2 * time.Second
148147

149148
ts.setUpRedis()

irecentjobpacker.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package dcron
2+
3+
import "time"
4+
5+
// IRecentJobPacker
6+
// this is an interface which be used to
7+
// pack the jobs running in the cluster state
8+
// is `unstable`.
9+
// like some nodes broken or new nodes were add.
10+
type IRecentJobPacker interface {
11+
// goroutine safety.
12+
// Add a job to packer
13+
// will save recent jobs (like 2 * heartbeat duration)
14+
AddJob(jobName string, t time.Time) error
15+
16+
// goroutine safety.
17+
// Pop out all jobs in packer
18+
PopAllJobs() (jobNames []string)
19+
}

irecentjobpacker_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package dcron_test
2+
3+
import (
4+
"strconv"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/libi/dcron"
10+
"github.com/stretchr/testify/suite"
11+
)
12+
13+
type IRecentJobPackerTestSuite struct {
14+
suite.Suite
15+
}
16+
17+
func (s *IRecentJobPackerTestSuite) checkPopJobs(l, r int, wg *sync.WaitGroup, tAfter time.Duration, recentPacker dcron.IRecentJobPacker) {
18+
<-time.After(tAfter)
19+
jobNames := recentPacker.PopAllJobs()
20+
expectJobNames := make([]string, 0)
21+
for i := l; i < r; i++ {
22+
expectJobNames = append(expectJobNames, "i_"+strconv.Itoa(i))
23+
}
24+
s.Equal(expectJobNames, jobNames)
25+
wg.Done()
26+
}
27+
28+
func (s *IRecentJobPackerTestSuite) TestNormal() {
29+
// the timeWindow is 6s and we will add a job each 0.5s
30+
// and check the jobs list after 8s.
31+
// expect job name is i_4 ~ i_16
32+
timeLimit := time.Second * 6
33+
wg := &sync.WaitGroup{}
34+
wg.Add(1)
35+
recentPacker := dcron.NewRecentJobPacker(timeLimit)
36+
37+
go s.checkPopJobs(4, 16, wg, 8*time.Second, recentPacker)
38+
39+
for i := 0; i < 16; i++ {
40+
recentPacker.AddJob("i_"+strconv.Itoa(i), time.Now())
41+
<-time.After(500 * time.Millisecond)
42+
}
43+
wg.Wait()
44+
}
45+
46+
func (s *IRecentJobPackerTestSuite) Test_CallMultiTimes() {
47+
timeLimit := time.Second * 6
48+
wg := &sync.WaitGroup{}
49+
wg.Add(2)
50+
recentPacker := dcron.NewRecentJobPacker(timeLimit)
51+
52+
go s.checkPopJobs(4, 16, wg, 8*time.Second, recentPacker)
53+
go s.checkPopJobs(20, 32, wg, 16*time.Second, recentPacker)
54+
55+
for i := 0; i < 32; i++ {
56+
recentPacker.AddJob("i_"+strconv.Itoa(i), time.Now())
57+
<-time.After(500 * time.Millisecond)
58+
}
59+
wg.Wait()
60+
}
61+
62+
func TestIRecentJobPackerTestSuite(t *testing.T) {
63+
s := new(IRecentJobPackerTestSuite)
64+
suite.Run(t, s)
65+
}

job_warpper.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@ type JobWarpper struct {
3131
func (job JobWarpper) Run() {
3232
//如果该任务分配给了这个节点 则允许执行
3333
if job.Dcron.allowThisNodeRun(job.Name) {
34-
if job.Func != nil {
35-
job.Func()
36-
}
37-
if job.Job != nil {
38-
job.Job.Run()
39-
}
34+
job.Execute()
35+
}
36+
}
37+
38+
func (job JobWarpper) Execute() {
39+
if job.Func != nil {
40+
job.Func()
41+
}
42+
if job.Job != nil {
43+
job.Job.Run()
4044
}
4145
}

0 commit comments

Comments
 (0)