xiaokang commented on code in PR #47691:
URL: https://github.com/apache/doris/pull/47691#discussion_r1966950088


##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
        if _, ok := client.headers["Authorization"]; ok {
                return strings.Replace(str, 
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 
1)
        }
        return str
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, 
client.db, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)
        } else {
-               client.observer.Failed(length)
                batch.RetryEvents(rest)
-               client.logger.Warnf("Retry send %d events", length)
+               client.logger.Warnf("Retry send %d events", len(rest))
        }
        return err
 }
 
-func (client *client) publishEvents(lable string, events []publisher.Event) 
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+       Label  string
+       Events []publisher.Event
+
+       // used in publishEvents
+       serialization string
+       dropped       int64
+       request       *http.Request
+       response      *http.Response
+       err           error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events 
[]publisher.Event) map[string]*Events {
+       tableEventsMap := make(map[string]*Events)
+       if len(events) == 0 {
+               return tableEventsMap
+       }
+
+       barrier, err := getBarrierFromEvent(&events[0])
+       if err == nil { // retry
+               if client.tableSelector.Sel.IsConst() { // table is const

Review Comment:
   ifelse in retry can be combined.



##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
        if _, ok := client.headers["Authorization"]; ok {
                return strings.Replace(str, 
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 
1)
        }
        return str
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, 
client.db, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)
        } else {
-               client.observer.Failed(length)
                batch.RetryEvents(rest)
-               client.logger.Warnf("Retry send %d events", length)
+               client.logger.Warnf("Retry send %d events", len(rest))
        }
        return err
 }
 
-func (client *client) publishEvents(lable string, events []publisher.Event) 
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+       Label  string
+       Events []publisher.Event
+
+       // used in publishEvents
+       serialization string
+       dropped       int64
+       request       *http.Request
+       response      *http.Response
+       err           error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events 
[]publisher.Event) map[string]*Events {
+       tableEventsMap := make(map[string]*Events)
+       if len(events) == 0 {
+               return tableEventsMap
+       }
+
+       barrier, err := getBarrierFromEvent(&events[0])
+       if err == nil { // retry

Review Comment:
   you should check `if barrier != nil`



##########
extension/beats/doris/client.go:
##########
@@ -39,14 +41,16 @@ import (
 )
 
 type client struct {
-       url        string
+       urlPrefix  string
+       db         string

Review Comment:
   duplicate with database



##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
        if _, ok := client.headers["Authorization"]; ok {
                return strings.Replace(str, 
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 
1)
        }
        return str
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, 
client.db, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)
        } else {
-               client.observer.Failed(length)
                batch.RetryEvents(rest)
-               client.logger.Warnf("Retry send %d events", length)
+               client.logger.Warnf("Retry send %d events", len(rest))
        }
        return err
 }
 
-func (client *client) publishEvents(lable string, events []publisher.Event) 
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+       Label  string
+       Events []publisher.Event
+
+       // used in publishEvents
+       serialization string
+       dropped       int64
+       request       *http.Request
+       response      *http.Response
+       err           error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events 
[]publisher.Event) map[string]*Events {
+       tableEventsMap := make(map[string]*Events)
+       if len(events) == 0 {
+               return tableEventsMap
+       }
+
+       barrier, err := getBarrierFromEvent(&events[0])
+       if err == nil { // retry

Review Comment:
   change the branch order as follows to be more natural.
   
   ```
   if barrier == nil { // first time
   
   } else { // retry
   
   }
   ```



##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
        if _, ok := client.headers["Authorization"]; ok {
                return strings.Replace(str, 
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 
1)
        }
        return str
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, 
client.db, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)
        } else {
-               client.observer.Failed(length)
                batch.RetryEvents(rest)
-               client.logger.Warnf("Retry send %d events", length)
+               client.logger.Warnf("Retry send %d events", len(rest))
        }
        return err
 }
 
-func (client *client) publishEvents(lable string, events []publisher.Event) 
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+       Label  string
+       Events []publisher.Event
+
+       // used in publishEvents
+       serialization string
+       dropped       int64
+       request       *http.Request
+       response      *http.Response
+       err           error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events 
[]publisher.Event) map[string]*Events {
+       tableEventsMap := make(map[string]*Events)
+       if len(events) == 0 {
+               return tableEventsMap
+       }
+
+       barrier, err := getBarrierFromEvent(&events[0])
+       if err == nil { // retry
+               if client.tableSelector.Sel.IsConst() { // table is const
+                       removeBarrierFromEvent(&events[0])
+                       tableEventsMap[barrier.Table] = &Events{
+                               Label:  barrier.Label,
+                               Events: events,
+                       }
+               } else { // split events by barrier
+                       for start := 0; start < len(events); {
+                               barrier, _ := 
getBarrierFromEvent(&events[start])
+                               removeBarrierFromEvent(&events[start])
+                               end := start + barrier.Length
+
+                               tableEventsMap[barrier.Table] = &Events{
+                                       Label:  barrier.Label,
+                                       Events: events[start:end], // should 
not do any append to the array, because here is a slice of the original array
+                               }
+
+                               start = end
+                       }
+               }
+       } else { // first time
+               if client.tableSelector.Sel.IsConst() { // table is const
+                       table, _ := 
client.tableSelector.Sel.Select(&events[0].Content)
+                       label := client.label(table)
+                       tableEventsMap[table] = &Events{
+                               Label:  label,
+                               Events: events,
+                       }
+               } else { // select table for each event
+                       for _, e := range events {
+                               table, err := 
client.tableSelector.Sel.Select(&e.Content)
+                               if err != nil {
+                                       client.logger.Errorf("Failed to select 
table: %+v", err)
+                               }
+                               if table == nilTable {
+                                       if client.defaultTable == nilTable {
+                                               client.logger.Warnf("table 
format error, the default table is not set, the data will be dropped")
+                                       } else {
+                                               table = client.defaultTable
+                                               client.logger.Warnf("table 
format error, use the default table: %s", client.defaultTable)
+                                       }
+                               }
+                               _, ok := tableEventsMap[table]
+                               if !ok {
+                                       tableEventsMap[table] = &Events{
+                                               Label:  client.label(table),
+                                               Events: []publisher.Event{e},
+                                       }
+                               } else {
+                                       tableEventsMap[table].Events = 
append(tableEventsMap[table].Events, e)
+                               }
+                       }
+               }
+       }
+
+       return tableEventsMap
+}
+
+func (client *client) publishEvents(tableEventsMap map[string]*Events) 
([]publisher.Event, error) {
        begin := time.Now()
 
-       var logFirstEvent []byte
-       var stringBuilder strings.Builder
+       for table, tableEvents := range tableEventsMap {
+               events := tableEvents.Events
+
+               if table == nilTable {
+                       client.logger.Errorf("Invalid table for %v events", 
len(events))
+                       tableEvents.dropped = int64(len(events))
+                       tableEvents.err = fmt.Errorf("invalid table for %v 
events", len(events))
+                       continue
+               }
+
+               var stringBuilder strings.Builder
 
-       dropped := 0
-       for i := range events {
-               event := &events[i]
-               serializedEvent, err := client.codec.Encode(client.beat.Beat, 
&event.Content)
+               for i := range events {
+                       event := &events[i]
+                       serializedEvent, err := 
client.codec.Encode(client.beat.Beat, &event.Content)
 
-               if err != nil {
-                       if event.Guaranteed() {
-                               client.logger.Errorf("Failed to serialize the 
event: %+v", err)
-                       } else {
-                               client.logger.Warnf("Failed to serialize the 
event: %+v", err)
+                       if err != nil {
+                               if event.Guaranteed() {
+                                       client.logger.Errorf("Failed to 
serialize the event: %+v", err)
+                               } else {
+                                       client.logger.Warnf("Failed to 
serialize the event: %+v", err)
+                               }
+                               client.logger.Debugf("Failed event: %v", event)
+
+                               tableEvents.dropped++
+                               continue
                        }
-                       client.logger.Debugf("Failed event: %v", event)
 
-                       dropped++
-                       client.reporter.IncrFailedRows(1)
+                       stringBuilder.Write(serializedEvent)
+                       stringBuilder.WriteString(client.lineDelimiter)
+               }
+
+               tableEvents.serialization = stringBuilder.String()

Review Comment:
   do not clear events to save memory now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to