This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-streamloader.git


The following commit(s) were added to refs/heads/master by this push:
     new c0cd939  [fix]avoid label conflict when setting labels in headers. 
(#25)
c0cd939 is described below

commit c0cd939bae6dcb06f3aa14e0a8e6cff473c15f5a
Author: Petrichor <xiaowe...@selectdb.com>
AuthorDate: Mon Feb 17 14:50:39 2025 +0800

    [fix]avoid label conflict when setting labels in headers. (#25)
---
 build.sh                |  2 ++
 loader/stream_loader.go | 24 +++++++++++++++++++-----
 reader/reader.go        |  4 ++--
 3 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/build.sh b/build.sh
index f1bc7b9..5bfd021 100755
--- a/build.sh
+++ b/build.sh
@@ -19,6 +19,8 @@
 ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
 
 rm -rf version.go
+# Formatting Go code with gofmt
+find . -name '*.go' -exec gofmt -w {} \;
 go generate
 go build
 echo "Build success. Output: ${ROOT}/doris-streamloader"
diff --git a/loader/stream_loader.go b/loader/stream_loader.go
index 936a190..80840c4 100644
--- a/loader/stream_loader.go
+++ b/loader/stream_loader.go
@@ -18,11 +18,13 @@
 package loader
 
 import (
+       "doris-streamloader/report"
        "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
        "net/http"
+       "strconv"
        "strings"
        "sync"
        "sync/atomic"
@@ -31,7 +33,6 @@ import (
 
        "github.com/pierrec/lz4/v4"
        log "github.com/sirupsen/logrus"
-       "doris-streamloader/report"
 )
 
 type StreamLoadOption struct {
@@ -143,7 +144,7 @@ func (s *StreamLoad) createUrl() string {
 }
 
 // stream load create http request with string data
-func (s *StreamLoad) createRequest(url string, reader io.Reader) (req 
*http.Request, err error) {
+func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex 
int, taskIndex int) (req *http.Request, err error) {
        req, err = http.NewRequest("PUT", url, reader)
        if err != nil {
                return
@@ -155,6 +156,19 @@ func (s *StreamLoad) createRequest(url string, reader 
io.Reader) (req *http.Requ
        req.Header.Set("Content-Type", "text/plain")
        for k, v := range s.headers {
                req.Header.Set(k, v)
+               // If a label has already been set in the headers, to prevent 
conflicts,
+               //generate a unique label by combining the original label, 
worker index, and task index.
+               if k == "label" {
+                       var builder strings.Builder
+                       builder.WriteString(v)
+                       builder.WriteString("_")
+                       builder.WriteString(strconv.Itoa(workerIndex))
+                       builder.WriteString("_")
+                       builder.WriteString(strconv.Itoa(taskIndex))
+
+                       req.Header.Set("label", builder.String())
+               }
+
        }
 
        if s.Compress {
@@ -228,10 +242,10 @@ func (s *StreamLoad) readData(isEOS *atomic.Bool, 
rawWriter *io.PipeWriter, read
        }
 }
 
-func (s *StreamLoad) send(url string, reader io.Reader) (*http.Response, 
error) {
+func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int, 
taskIndex int) (*http.Response, error) {
        realUrl := url
        for {
-               req, err := s.createRequest(realUrl, reader)
+               req, err := s.createRequest(realUrl, reader, workerIndex, 
taskIndex)
                if err != nil {
                        if req == nil {
                                return nil, err
@@ -347,7 +361,7 @@ func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int, 
maxBytesPerTask int,
                                workerIndex:     workerIndex,
                                taskIndex:       taskIndex,
                        })
-               if resp, err := s.send(url, NopCloser(pr)); err != nil {
+               if resp, err := s.send(url, NopCloser(pr), workerIndex, 
taskIndex); err != nil {
                        s.handleSendError(workerIndex, taskIndex)
                        log.Errorf("Send error, resp: %v error message: %v", 
resp, err)
                        return
diff --git a/reader/reader.go b/reader/reader.go
index 634b9f0..2dcd2a5 100644
--- a/reader/reader.go
+++ b/reader/reader.go
@@ -19,6 +19,8 @@ package file
 
 import (
        "bufio"
+       "doris-streamloader/loader"
+       "doris-streamloader/report"
        "io"
        "os"
        "path/filepath"
@@ -28,8 +30,6 @@ import (
        "time"
 
        log "github.com/sirupsen/logrus"
-       report "doris-streamloader/report"
-       loader "doris-streamloader/loader"
 )
 
 type FileReader struct {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to