Skip to content

Commit c0cd939

Browse files
author
Petrichor
authored
[fix]avoid label conflict when setting labels in headers. (#25)
1 parent 65184e7 commit c0cd939

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

build.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
2020

2121
rm -rf version.go
22+
# Formatting Go code with gofmt
23+
find . -name '*.go' -exec gofmt -w {} \;
2224
go generate
2325
go build
2426
echo "Build success. Output: ${ROOT}/doris-streamloader"

loader/stream_loader.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package loader
1919

2020
import (
21+
"doris-streamloader/report"
2122
"encoding/json"
2223
"fmt"
2324
"io"
2425
"io/ioutil"
2526
"net/http"
27+
"strconv"
2628
"strings"
2729
"sync"
2830
"sync/atomic"
@@ -31,7 +33,6 @@ import (
3133

3234
"github.com/pierrec/lz4/v4"
3335
log "github.com/sirupsen/logrus"
34-
"doris-streamloader/report"
3536
)
3637

3738
type StreamLoadOption struct {
@@ -143,7 +144,7 @@ func (s *StreamLoad) createUrl() string {
143144
}
144145

145146
// stream load create http request with string data
146-
func (s *StreamLoad) createRequest(url string, reader io.Reader) (req *http.Request, err error) {
147+
func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex int, taskIndex int) (req *http.Request, err error) {
147148
req, err = http.NewRequest("PUT", url, reader)
148149
if err != nil {
149150
return
@@ -155,6 +156,19 @@ func (s *StreamLoad) createRequest(url string, reader io.Reader) (req *http.Requ
155156
req.Header.Set("Content-Type", "text/plain")
156157
for k, v := range s.headers {
157158
req.Header.Set(k, v)
159+
// If a label has already been set in the headers, to prevent conflicts,
160+
//generate a unique label by combining the original label, worker index, and task index.
161+
if k == "label" {
162+
var builder strings.Builder
163+
builder.WriteString(v)
164+
builder.WriteString("_")
165+
builder.WriteString(strconv.Itoa(workerIndex))
166+
builder.WriteString("_")
167+
builder.WriteString(strconv.Itoa(taskIndex))
168+
169+
req.Header.Set("label", builder.String())
170+
}
171+
158172
}
159173

160174
if s.Compress {
@@ -228,10 +242,10 @@ func (s *StreamLoad) readData(isEOS *atomic.Bool, rawWriter *io.PipeWriter, read
228242
}
229243
}
230244

231-
func (s *StreamLoad) send(url string, reader io.Reader) (*http.Response, error) {
245+
func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int, taskIndex int) (*http.Response, error) {
232246
realUrl := url
233247
for {
234-
req, err := s.createRequest(realUrl, reader)
248+
req, err := s.createRequest(realUrl, reader, workerIndex, taskIndex)
235249
if err != nil {
236250
if req == nil {
237251
return nil, err
@@ -347,7 +361,7 @@ func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int, maxBytesPerTask int,
347361
workerIndex: workerIndex,
348362
taskIndex: taskIndex,
349363
})
350-
if resp, err := s.send(url, NopCloser(pr)); err != nil {
364+
if resp, err := s.send(url, NopCloser(pr), workerIndex, taskIndex); err != nil {
351365
s.handleSendError(workerIndex, taskIndex)
352366
log.Errorf("Send error, resp: %v error message: %v", resp, err)
353367
return

reader/reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package file
1919

2020
import (
2121
"bufio"
22+
"doris-streamloader/loader"
23+
"doris-streamloader/report"
2224
"io"
2325
"os"
2426
"path/filepath"
@@ -28,8 +30,6 @@ import (
2830
"time"
2931

3032
log "github.com/sirupsen/logrus"
31-
report "doris-streamloader/report"
32-
loader "doris-streamloader/loader"
3333
)
3434

3535
type FileReader struct {

0 commit comments

Comments
 (0)