Skip to content

Commit faec238

Browse files
committed
feat(stats): add stats collector
1 parent a8d0c04 commit faec238

5 files changed

Lines changed: 572 additions & 70 deletions

File tree

gloader.go

Lines changed: 85 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"log"
7+
"strings"
88
"sync"
99

1010
"github.com/mohammadv184/gloader/data"
1111
"github.com/mohammadv184/gloader/driver"
12+
"github.com/mohammadv184/gloader/pkg/stats"
1213
)
1314

1415
const (
@@ -23,6 +24,7 @@ var (
2324
ErrEndOffsetLessThanStartOffset = errors.New("end offset less than start offset")
2425
ErrEndOffsetRequired = errors.New("end offset required")
2526
ErrSrcConnectionIsRequired = errors.New("source connection is required")
27+
ErrDestConnectionIsRequired = errors.New("destination connection is required")
2628
)
2729

2830
var ErrCCCauseStopFuncCalled = errors.New("stop func called")
@@ -40,6 +42,7 @@ type GLoader struct {
4042
workers uint
4143
ctx context.Context
4244
ctxCancelFunc context.CancelCauseFunc
45+
stats *stats.Stats
4346
}
4447

4548
func NewGLoader() *GLoader {
@@ -48,10 +51,11 @@ func NewGLoader() *GLoader {
4851
workers: DefaultWorkers,
4952
dataCollectionEndOffset: make(map[string]uint64),
5053
dataCollectionStartOffset: make(map[string]uint64),
54+
stats: NewStats(),
5155
}
5256
}
5357

54-
func (g *GLoader) Source(name, dsn string) error {
58+
func (g *GLoader) Src(name, dsn string) error {
5559
d, err := driver.GetDriver(name)
5660
if err != nil {
5761
return err
@@ -82,6 +86,32 @@ func (g *GLoader) Dest(name, dsn string) error {
8286
return nil
8387
}
8488

89+
func (g *GLoader) GetSrcDetails(ctx context.Context) (driver.DatabaseDetail, error) {
90+
if g.srcConnector == nil {
91+
return driver.DatabaseDetail{}, ErrSrcConnectionIsRequired
92+
}
93+
94+
conn, err := g.srcConnector.Connect(ctx)
95+
if err != nil {
96+
return driver.DatabaseDetail{}, err
97+
}
98+
99+
return conn.GetDetails(ctx)
100+
}
101+
102+
func (g *GLoader) GetDestDetails(ctx context.Context) (driver.DatabaseDetail, error) {
103+
if g.destConnector == nil {
104+
return driver.DatabaseDetail{}, ErrDestConnectionIsRequired
105+
}
106+
107+
conn, err := g.destConnector.Connect(ctx)
108+
if err != nil {
109+
return driver.DatabaseDetail{}, err
110+
}
111+
112+
return conn.GetDetails(ctx)
113+
}
114+
85115
func (g *GLoader) Filter(dataCollection, key string, condition driver.Condition, value string) *GLoader {
86116
if g.srcConnector == nil {
87117
panic(ErrSrcConnectionIsRequired)
@@ -144,6 +174,10 @@ func (g *GLoader) SetWorkers(workers uint) *GLoader {
144174
return g
145175
}
146176

177+
func (g *GLoader) Stats() *stats.Stats {
178+
return g.stats
179+
}
180+
147181
func (g *GLoader) Start() error {
148182
return g.StartWithContext(context.Background())
149183
}
@@ -164,41 +198,60 @@ func (g *GLoader) StartWithContext(ctx context.Context) error {
164198
return err
165199
}
166200

201+
destConn, err := g.destConnector.Connect(c)
202+
if err != nil {
203+
return err
204+
}
205+
defer destConn.Close()
206+
207+
dDetails, err := destConn.GetDetails(c)
208+
if err != nil {
209+
return err
210+
}
211+
212+
var DCs []driver.DataCollectionDetail
213+
DCs = sDetails.DataCollections
214+
if len(g.includedDataCollections) > 0 {
215+
DCs = sDetails.OnlyDataCollections(g.includedDataCollections...)
216+
}
217+
218+
if len(g.excludedDataCollections) > 0 {
219+
DCs = sDetails.AllDataCollectionsExcept(g.excludedDataCollections...)
220+
}
221+
167222
wg := sync.WaitGroup{}
168-
for _, dc := range sDetails.DataCollections {
223+
for i, dc := range DCs {
169224
if dc.DataSetCount == 0 {
170225
continue
171226
}
172227

173-
if len(g.includedDataCollections) > 0 {
174-
var found bool
175-
for _, included := range g.includedDataCollections {
176-
if dc.Name == included {
177-
found = true
178-
break
179-
}
180-
}
181-
if !found {
182-
continue
183-
}
228+
dDC, err := dDetails.GetDataCollection(dc.Name)
229+
if err != nil {
230+
return fmt.Errorf("GLoader: failed to get destination data collection details for %s", dc.Name)
184231
}
185232

186-
if len(g.excludedDataCollections) > 0 {
187-
var found bool
188-
for _, excluded := range g.excludedDataCollections {
189-
if dc.Name == excluded {
190-
found = true
191-
break
192-
}
193-
}
194-
if found {
233+
var srcSchema strings.Builder
234+
srcSchema.WriteString("Migration Source Schema:\n")
235+
var destSchema strings.Builder
236+
destSchema.WriteString("Migration Destination Schema:\n")
237+
for k, v := range dc.GetDataMap().GetTypeMap() {
238+
srcSchema.WriteString(fmt.Sprintf("%d \t %s: \t %s \t IS NULLABLE: %t\n", i, k, v.GetTypeName(), dc.GetDataMap().IsNullable(k)))
239+
240+
dDT := dDC.GetDataMap().Get(k)
241+
if dDT == nil {
195242
continue
196243
}
244+
245+
destSchema.WriteString(fmt.Sprintf("%d \t %s: \t %s \t IS NULLABLE: %t\n", i, k, dDT.GetTypeName(), dDC.GetDataMap().IsNullable(k)))
197246
}
198247

248+
fmt.Println(srcSchema.String())
249+
fmt.Println(destSchema.String())
250+
199251
wg.Add(2)
200-
fmt.Println("Starting to load", dc.Name, "from", 0, "to", dc.DataSetCount)
201-
buffer := data.NewBuffer(c)
252+
253+
buffer := data.NewBuffer(c).
254+
WithObserver(NewBufferObserverAdapter(g.stats, dc.Name))
202255

203256
rConnectionPool := driver.NewConnectionPool(g.srcConnector)
204257
wConnectionPool := driver.NewConnectionPool(g.destConnector)
@@ -214,6 +267,7 @@ func (g *GLoader) StartWithContext(ctx context.Context) error {
214267
} else {
215268
reader.SetEndOffset(uint64(dc.DataSetCount))
216269
}
270+
fmt.Println("Starting to load", dc.Name, "from", reader.startOffset, "to", reader.endOffset)
217271

218272
reader.SetRowsPerBatch(g.rowsPerBatch)
219273
reader.SetWorkers(g.workers)
@@ -225,24 +279,26 @@ func (g *GLoader) StartWithContext(ctx context.Context) error {
225279
go func(reader *Reader, rcPool *driver.ConnectionPool) {
226280
err := reader.Start()
227281
if err != nil {
228-
log.Println(err)
282+
panic(err)
229283
}
230284
wg.Done()
231285
err = rcPool.CloseAll()
232286
if err != nil {
233-
log.Println(err)
287+
// TODO: logging system
288+
//log.Println(err)
234289
}
235290
}(reader, rConnectionPool)
236291

237292
go func(writer *Writer, wcPool *driver.ConnectionPool) {
238293
err := writer.Start()
239294
if err != nil {
240-
log.Println(err)
295+
panic(err)
241296
}
242297
wg.Done()
243298
err = wcPool.CloseAll()
244299
if err != nil {
245-
log.Println(err)
300+
// TODO: logging system
301+
//log.Println(err)
246302
}
247303
}(writer, wConnectionPool)
248304

0 commit comments

Comments
 (0)