Skip to content

Commit fcc6652

Browse files
authored
[Feature] Support for Custom Line Break Characters (#27)
1 parent c0cd939 commit fcc6652

File tree

2 files changed

+43
-6
lines changed

2 files changed

+43
-6
lines changed

main.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"math"
2424
"os"
25+
"regexp"
2526
"strconv"
2627
"strings"
2728
"sync"
@@ -81,8 +82,8 @@ var (
8182
retryInfo map[int]int
8283
showVersion bool
8384
queueSize int
84-
85-
bufferPool = sync.Pool{
85+
lineDelimiter byte = '\n'
86+
bufferPool = sync.Pool{
8687
New: func() interface{} {
8788
return make([]byte, 0, bufferSize)
8889
},
@@ -191,6 +192,24 @@ func initFlags() {
191192
utils.InitLog(logLevel)
192193
}
193194

195+
// Restore hex escape sequences like \xNN to their corresponding characters
196+
func restoreHexEscapes(s1 string) (string, error) {
197+
if s1 == `\n` {
198+
return "\n", nil
199+
}
200+
201+
re := regexp.MustCompile(`\\x([0-9A-Fa-f]{2})`)
202+
203+
return re.ReplaceAllStringFunc(s1, func(match string) string {
204+
hexValue := match[2:] // Remove the \x prefix
205+
decValue, err := strconv.ParseInt(hexValue, 16, 0)
206+
if err != nil {
207+
return match
208+
}
209+
return string(rune(decValue))
210+
}), nil
211+
}
212+
194213
//go:generate go run gen_version.go
195214
func paramCheck() {
196215
if showVersion {
@@ -253,6 +272,19 @@ func paramCheck() {
253272
if strings.ToLower(kv[0]) == "format" && strings.ToLower(kv[1]) != "csv" {
254273
enableConcurrency = false
255274
}
275+
276+
if strings.ToLower(kv[0]) == "line_delimiter" {
277+
278+
restored, err := restoreHexEscapes(kv[1])
279+
if err != nil || len(restored) != 1 {
280+
log.Errorf("line_delimiter invalid: %s", kv[1])
281+
os.Exit(1)
282+
} else {
283+
lineDelimiter = restored[0]
284+
}
285+
286+
}
287+
256288
if len(kv) > 2 {
257289
headers[kv[0]] = strings.Join(kv[1:], ":")
258290
} else {
@@ -369,7 +401,7 @@ func main() {
369401
streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask, &retryInfo)
370402
reporter.Report()
371403
defer reporter.CloseWait()
372-
reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount)
404+
reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount, lineDelimiter)
373405
reader.Close()
374406

375407
streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime)

reader/reader.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ func NewFileReader(filePaths string, batchRows int, batchBytes int, bufferSize i
108108
}
109109

110110
// Read File
111-
func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTask int, retryInfo *map[int]int, loadResp *loader.Resp, retryCount int) {
111+
func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTask int, retryInfo *map[int]int,
112+
loadResp *loader.Resp, retryCount int, lineDelimiter byte) {
112113
index := 0
113114
data := f.pool.Get().([]byte)
114115
count := f.batchRows
@@ -125,16 +126,20 @@ func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTas
125126
for _, file := range f.files {
126127
loadResp.LoadFiles = append(loadResp.LoadFiles, file.Name())
127128
reader := bufio.NewReaderSize(file, f.bufferSize)
129+
128130
for {
129131
if atomic.LoadUint64(&reporter.FinishedWorkers) == atomic.LoadUint64(&reporter.TotalWorkers) {
130132
return
131133
}
132-
line, err := reader.ReadBytes('\n')
133-
if err == io.EOF {
134+
line, err := reader.ReadBytes(lineDelimiter)
135+
if err == io.EOF && len(line) == 0 {
134136
file.Close()
135137
break
136138
} else if err != nil {
137139
log.Errorf("Read file failed, error message: %v, before retrying, we suggest:\n1.Check the input data files and fix if there is any problem.\n2.Do select count(*) to check whether data is partially loaded.\n3.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n4.Otherwise, just retry.\n", err)
140+
if len(line) != 0 {
141+
log.Error("5.When using a specified line delimiter, the file must end with that delimiter.")
142+
}
138143
os.Exit(1)
139144
}
140145

0 commit comments

Comments
 (0)