xiaokang commented on code in PR #47691: URL: https://github.com/apache/doris/pull/47691#discussion_r1966950088
########## extension/beats/doris/client.go: ########## @@ -184,124 +194,320 @@ func (client *client) Close() error { } func (client *client) String() string { - str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) if _, ok := client.headers["Authorization"]; ok { return strings.Replace(str, "Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 1) } return str } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, client.db, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) +} + +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the first event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the first event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) } else { - client.observer.Failed(length) batch.RetryEvents(rest) - client.logger.Warnf("Retry send %d events", length) + client.logger.Warnf("Retry send %d events", len(rest)) } return err } -func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) { +const nilTable = "" + +type Events struct { + Label string + Events []publisher.Event + + // used in publishEvents + serialization string + dropped int64 + request *http.Request + response *http.Response + err error +} + +func (client *client) makeTableEventsMap(_ context.Context, events []publisher.Event) map[string]*Events { + tableEventsMap := make(map[string]*Events) + if len(events) == 0 { + return tableEventsMap + } + + barrier, err := getBarrierFromEvent(&events[0]) + if err == nil { // retry + if client.tableSelector.Sel.IsConst() { // table is const Review Comment: ifelse in retry can be combined. ########## extension/beats/doris/client.go: ########## @@ -184,124 +194,320 @@ func (client *client) Close() error { } func (client *client) String() string { - str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) if _, ok := client.headers["Authorization"]; ok { return strings.Replace(str, "Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 1) } return str } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, client.db, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) +} + +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the first event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the first event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) } else { - client.observer.Failed(length) batch.RetryEvents(rest) - client.logger.Warnf("Retry send %d events", length) + client.logger.Warnf("Retry send %d events", len(rest)) } return err } -func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) { +const nilTable = "" + +type Events struct { + Label string + Events []publisher.Event + + // used in publishEvents + serialization string + dropped int64 + request *http.Request + response *http.Response + err error +} + +func (client *client) makeTableEventsMap(_ context.Context, events []publisher.Event) map[string]*Events { + tableEventsMap := make(map[string]*Events) + if len(events) == 0 { + return tableEventsMap + } + + barrier, err := getBarrierFromEvent(&events[0]) + if err == nil { // retry Review Comment: you should check `if barrier != nil` ########## extension/beats/doris/client.go: ########## @@ -39,14 +41,16 @@ import ( ) type client struct { - url string + urlPrefix string + db string Review Comment: duplicate with database ########## extension/beats/doris/client.go: ########## @@ -184,124 +194,320 @@ func (client *client) Close() error { } func (client *client) String() string { - str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) if _, ok := client.headers["Authorization"]; ok { return strings.Replace(str, "Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 1) } return str } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, client.db, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) +} + +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the first event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the first event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) } else { - client.observer.Failed(length) batch.RetryEvents(rest) - client.logger.Warnf("Retry send %d events", length) + client.logger.Warnf("Retry send %d events", len(rest)) } return err } -func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) { +const nilTable = "" + +type Events struct { + Label string + Events []publisher.Event + + // used in publishEvents + serialization string + dropped int64 + request *http.Request + response *http.Response + err error +} + +func (client *client) makeTableEventsMap(_ context.Context, events []publisher.Event) map[string]*Events { + tableEventsMap := make(map[string]*Events) + if len(events) == 0 { + return tableEventsMap + } + + barrier, err := getBarrierFromEvent(&events[0]) + if err == nil { // retry Review Comment: change the branch order as follows to be more natural. ``` if barrier == nil { // first time } else { // retry } ``` ########## extension/beats/doris/client.go: ########## @@ -184,124 +194,320 @@ func (client *client) Close() error { } func (client *client) String() string { - str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) if _, ok := client.headers["Authorization"]; ok { return strings.Replace(str, "Authorization:"+client.headers["Authorization"], "Authorization:Basic ******", 1) } return str } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix, client.db, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) +} + +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the first event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the first event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) } else { - client.observer.Failed(length) batch.RetryEvents(rest) - client.logger.Warnf("Retry send %d events", length) + client.logger.Warnf("Retry send %d events", len(rest)) } return err } -func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) { +const nilTable = "" + +type Events struct { + Label string + Events []publisher.Event + + // used in publishEvents + serialization string + dropped int64 + request *http.Request + response *http.Response + err error +} + +func (client *client) makeTableEventsMap(_ context.Context, events []publisher.Event) map[string]*Events { + tableEventsMap := make(map[string]*Events) + if len(events) == 0 { + return tableEventsMap + } + + barrier, err := getBarrierFromEvent(&events[0]) + if err == nil { // retry + if client.tableSelector.Sel.IsConst() { // table is const + removeBarrierFromEvent(&events[0]) + tableEventsMap[barrier.Table] = &Events{ + Label: barrier.Label, + Events: events, + } + } else { // split events by barrier + for start := 0; start < len(events); { + barrier, _ := getBarrierFromEvent(&events[start]) + removeBarrierFromEvent(&events[start]) + end := start + barrier.Length + + tableEventsMap[barrier.Table] = &Events{ + Label: barrier.Label, + Events: events[start:end], // should not do any append to the array, because here is a slice of the original array + } + + start = end + } + } + } else { // first time + if client.tableSelector.Sel.IsConst() { // table is const + table, _ := client.tableSelector.Sel.Select(&events[0].Content) + label := client.label(table) + tableEventsMap[table] = &Events{ + Label: label, + Events: events, + } + } else { // select table for each event + for _, e := range events { + table, err := client.tableSelector.Sel.Select(&e.Content) + if err != nil { + client.logger.Errorf("Failed to select table: %+v", err) + } + if table == nilTable { + if client.defaultTable == nilTable { + client.logger.Warnf("table format error, the default table is not set, the data will be dropped") + } else { + table = client.defaultTable + client.logger.Warnf("table format error, use the default table: %s", client.defaultTable) + } + } + _, ok := tableEventsMap[table] + if !ok { + tableEventsMap[table] = &Events{ + Label: client.label(table), + Events: []publisher.Event{e}, + } + } else { + tableEventsMap[table].Events = append(tableEventsMap[table].Events, e) + } + } + } + } + + return tableEventsMap +} + +func (client *client) publishEvents(tableEventsMap map[string]*Events) ([]publisher.Event, error) { begin := time.Now() - var logFirstEvent []byte - var stringBuilder strings.Builder + for table, tableEvents := range tableEventsMap { + events := tableEvents.Events + + if table == nilTable { + client.logger.Errorf("Invalid table for %v events", len(events)) + tableEvents.dropped = int64(len(events)) + tableEvents.err = fmt.Errorf("invalid table for %v events", len(events)) + continue + } + + var stringBuilder strings.Builder - dropped := 0 - for i := range events { - event := &events[i] - serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) + for i := range events { + event := &events[i] + serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) - if err != nil { - if event.Guaranteed() { - client.logger.Errorf("Failed to serialize the event: %+v", err) - } else { - client.logger.Warnf("Failed to serialize the event: %+v", err) + if err != nil { + if event.Guaranteed() { + client.logger.Errorf("Failed to serialize the event: %+v", err) + } else { + client.logger.Warnf("Failed to serialize the event: %+v", err) + } + client.logger.Debugf("Failed event: %v", event) + + tableEvents.dropped++ + continue } - client.logger.Debugf("Failed event: %v", event) - dropped++ - client.reporter.IncrFailedRows(1) + stringBuilder.Write(serializedEvent) + stringBuilder.WriteString(client.lineDelimiter) + } + + tableEvents.serialization = stringBuilder.String() Review Comment: do not clear events to save memory now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org