This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-streamloader.git
The following commit(s) were added to refs/heads/master by this push: new c0cd939 [fix]avoid label conflict when setting labels in headers. (#25) c0cd939 is described below commit c0cd939bae6dcb06f3aa14e0a8e6cff473c15f5a Author: Petrichor <xiaowe...@selectdb.com> AuthorDate: Mon Feb 17 14:50:39 2025 +0800 [fix]avoid label conflict when setting labels in headers. (#25) --- build.sh | 2 ++ loader/stream_loader.go | 24 +++++++++++++++++++----- reader/reader.go | 4 ++-- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/build.sh b/build.sh index f1bc7b9..5bfd021 100755 --- a/build.sh +++ b/build.sh @@ -19,6 +19,8 @@ ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" rm -rf version.go +# Formatting Go code with gofmt +find . -name '*.go' -exec gofmt -w {} \; go generate go build echo "Build success. Output: ${ROOT}/doris-streamloader" diff --git a/loader/stream_loader.go b/loader/stream_loader.go index 936a190..80840c4 100644 --- a/loader/stream_loader.go +++ b/loader/stream_loader.go @@ -18,11 +18,13 @@ package loader import ( + "doris-streamloader/report" "encoding/json" "fmt" "io" "io/ioutil" "net/http" + "strconv" "strings" "sync" "sync/atomic" @@ -31,7 +33,6 @@ import ( "github.com/pierrec/lz4/v4" log "github.com/sirupsen/logrus" - "doris-streamloader/report" ) type StreamLoadOption struct { @@ -143,7 +144,7 @@ func (s *StreamLoad) createUrl() string { } // stream load create http request with string data -func (s *StreamLoad) createRequest(url string, reader io.Reader) (req *http.Request, err error) { +func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex int, taskIndex int) (req *http.Request, err error) { req, err = http.NewRequest("PUT", url, reader) if err != nil { return @@ -155,6 +156,19 @@ func (s *StreamLoad) createRequest(url string, reader io.Reader) (req *http.Requ req.Header.Set("Content-Type", "text/plain") for k, v := range s.headers { req.Header.Set(k, v) + // If a label has already been set in the headers, to prevent conflicts, + //generate a unique label by combining the original label, worker index, and task index. + if k == "label" { + var builder strings.Builder + builder.WriteString(v) + builder.WriteString("_") + builder.WriteString(strconv.Itoa(workerIndex)) + builder.WriteString("_") + builder.WriteString(strconv.Itoa(taskIndex)) + + req.Header.Set("label", builder.String()) + } + } if s.Compress { @@ -228,10 +242,10 @@ func (s *StreamLoad) readData(isEOS *atomic.Bool, rawWriter *io.PipeWriter, read } } -func (s *StreamLoad) send(url string, reader io.Reader) (*http.Response, error) { +func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int, taskIndex int) (*http.Response, error) { realUrl := url for { - req, err := s.createRequest(realUrl, reader) + req, err := s.createRequest(realUrl, reader, workerIndex, taskIndex) if err != nil { if req == nil { return nil, err @@ -347,7 +361,7 @@ func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int, maxBytesPerTask int, workerIndex: workerIndex, taskIndex: taskIndex, }) - if resp, err := s.send(url, NopCloser(pr)); err != nil { + if resp, err := s.send(url, NopCloser(pr), workerIndex, taskIndex); err != nil { s.handleSendError(workerIndex, taskIndex) log.Errorf("Send error, resp: %v error message: %v", resp, err) return diff --git a/reader/reader.go b/reader/reader.go index 634b9f0..2dcd2a5 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -19,6 +19,8 @@ package file import ( "bufio" + "doris-streamloader/loader" + "doris-streamloader/report" "io" "os" "path/filepath" @@ -28,8 +30,6 @@ import ( "time" log "github.com/sirupsen/logrus" - report "doris-streamloader/report" - loader "doris-streamloader/loader" ) type FileReader struct { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org