Skip to content

Commit 445cc34

Browse files
committed
fix(reader): fix deadlock
1 parent faec238 commit 445cc34

4 files changed

Lines changed: 60 additions & 12 deletions

File tree

go.mod

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,24 @@ go 1.19
55
require (
66
github.com/go-sql-driver/mysql v1.7.0
77
github.com/jackc/pgx/v5 v5.4.1
8-
github.com/spf13/cobra v1.6.1
8+
github.com/spf13/cobra v1.7.0
9+
github.com/vbauerster/mpb v3.4.0+incompatible
10+
golang.org/x/term v0.8.0
911
)
1012

1113
require (
14+
github.com/VividCortex/ewma v1.2.0 // indirect
15+
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
1216
github.com/inconshreveable/mousetrap v1.1.0 // indirect
1317
github.com/jackc/pgpassfile v1.0.0 // indirect
1418
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
19+
github.com/kr/text v0.2.0 // indirect
20+
github.com/mattn/go-isatty v0.0.19 // indirect
21+
github.com/rogpeppe/go-internal v1.11.0 // indirect
22+
github.com/russross/blackfriday/v2 v2.1.0 // indirect
1523
github.com/spf13/pflag v1.0.5 // indirect
1624
golang.org/x/crypto v0.9.0 // indirect
25+
golang.org/x/sys v0.8.0 // indirect
1726
golang.org/x/text v0.9.0 // indirect
27+
gopkg.in/yaml.v3 v3.0.1 // indirect
1828
)

go.sum

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
2+
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
3+
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
14
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
5+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
26
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
37
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
48
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
59
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
6-
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
710
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
811
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
912
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
@@ -12,22 +15,38 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/
1215
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
1316
github.com/jackc/pgx/v5 v5.4.1 h1:oKfB/FhuVtit1bBM3zNRRsZ925ZkMN3HXL+LgLUM9lE=
1417
github.com/jackc/pgx/v5 v5.4.1/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY=
18+
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
19+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
20+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
21+
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
22+
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
1523
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1624
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
25+
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
26+
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
27+
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
1728
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
18-
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
19-
github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY=
29+
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
30+
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
2031
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
2132
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
2233
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2334
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
2435
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2536
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
37+
github.com/vbauerster/mpb v3.4.0+incompatible h1:mfiiYw87ARaeRW6x5gWwYRUawxaW1tLAD8IceomUCNw=
38+
github.com/vbauerster/mpb v3.4.0+incompatible/go.mod h1:zAHG26FUhVKETRu+MWqYXcI70POlC6N8up9p1dID7SU=
2639
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
2740
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
41+
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
42+
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
43+
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
44+
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
45+
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
2846
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
2947
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
3048
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
49+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
3150
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3251
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
3352
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

reader.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gloader
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"log"
78
"sync"
@@ -71,7 +72,8 @@ func (r *Reader) Start() error {
7172
fmt.Println("closing buffer")
7273
err := r.buffer.Close()
7374
if err != nil {
74-
log.Println(err)
75+
// TODO: logging system
76+
//log.Println(err)
7577
}
7678
}()
7779

@@ -116,6 +118,19 @@ func (r *Reader) RunWorker(startOffset, endOffset, rowPerBatch uint64) {
116118
retryRead:
117119
batch, err := rConn.Read(r.ctx, r.dataCollection, i, i+rowPerBatch)
118120
if err != nil {
121+
select {
122+
case <-r.ctx.Done():
123+
goto stopWorker
124+
default:
125+
}
126+
if errors.Is(err, driver.ErrConnectionIsClosed) {
127+
conn, cIndex, err = r.connectionP.Connect(r.ctx)
128+
if err != nil {
129+
log.Printf("error on connecting to database err: %s\n", err)
130+
goto retryRead
131+
}
132+
rConn = conn.(driver.ReadableConnection)
133+
}
119134
log.Printf("error on reading data batch from %d,%d err: %s\n", i, i+rowPerBatch, err)
120135
goto retryRead
121136

@@ -126,7 +141,8 @@ func (r *Reader) RunWorker(startOffset, endOffset, rowPerBatch uint64) {
126141

127142
select {
128143
case <-r.ctx.Done():
129-
log.Printf("context canceled: raeder worker %s:%d,%d stoped\n", r.dataCollection, startOffset, endOffset)
144+
// TODO: logging system
145+
//log.Printf("context canceled: raeder worker %s:%d,%d stoped\n", r.dataCollection, startOffset, endOffset)
130146
goto stopWorker
131147
case ch <- batch:
132148
continue
@@ -136,7 +152,8 @@ func (r *Reader) RunWorker(startOffset, endOffset, rowPerBatch uint64) {
136152
stopWorker:
137153
err := r.connectionP.CloseConnection(cIndex)
138154
if err != nil {
139-
log.Println("failed on closing database connection err:", err)
155+
// TODO: logging system
156+
//log.Println("failed on closing database connection err:", err)
140157
}
141158
wg.Done()
142159
close(ch)
@@ -147,7 +164,7 @@ func (r *Reader) RunWorker(startOffset, endOffset, rowPerBatch uint64) {
147164
select {
148165
case batch, ok := <-ch:
149166
if !ok {
150-
log.Printf("batch readCh closed %s:%d,%d\n", r.dataCollection, startOffset, endOffset)
167+
//log.Printf("batch readCh closed %s:%d,%d\n", r.dataCollection, startOffset, endOffset)
151168
wg.Done()
152169
return
153170
}

writer.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,26 +78,28 @@ func (w *Writer) RunWorker() {
7878
batch.Add(dSet)
7979
break
8080
}
81-
log.Println(err)
81+
panic(err)
8282
return
8383
}
8484

85-
// fmt.Println("Length: ", w.dataCollection, batch.GetLength())
85+
// fmt.Println("LengthChanged: ", w.dataCollection, batch.GetLength())
8686
batch.Add(dSet)
8787
}
8888

8989
if batch.GetLength() > 0 {
90+
//fmt.Println("Writing: ", w.dataCollection, batch.GetLength())
9091
if err := wConn.Write(w.ctx, w.dataCollection, batch); err != nil {
9192
log.Printf("error on writing data to %s: %s", w.dataCollection, err)
9293
panic(err)
9394
}
9495
}
9596

9697
if w.buffer.IsClosed() && w.buffer.IsEmpty() {
97-
log.Printf("%s Writer worker is closed", w.dataCollection)
98+
//log.Printf("%s Writer worker is closed", w.dataCollection)
9899
err := w.connectionP.CloseConnection(cIndex)
99100
if err != nil {
100-
log.Println(err)
101+
// TODO: logging system
102+
//log.Println(err)
101103
}
102104
return
103105
}

0 commit comments

Comments
 (0)