xiaokang commented on code in PR #47691:
URL: https://github.com/apache/doris/pull/47691#discussion_r1957455480
##########
extension/beats/doris/config.go:
##########
@@ -29,16 +29,17 @@ import (
)
type config struct {
- Hosts []string `config:"fenodes"`
- HttpHosts []string `config:"http_hosts"`
- User string `config:"user" validate:"required"`
- Password string `config:"password"`
- Database string `config:"database" validate:"required"`
- Table string `config:"table" validate:"required"`
- LabelPrefix string `config:"label_prefix"`
- LineDelimiter string `config:"line_delimiter"`
- LogRequest bool `config:"log_request"`
- LogProgressInterval int `config:"log_progress_interval"`
+ Hosts []string `config:"fenodes"`
+ HttpHosts []string `config:"http_hosts"`
+ User string `config:"user" validate:"required"`
+ Password string `config:"password"`
+ Database string `config:"database"
validate:"required"`
+ Table string `config:"table"`
+ Tables []map[string]any `config:"tables"`
Review Comment:
In elasticsearch output plugin, `tables` config is used to specify multiple
tables to select by filters. In our case, we can simply use two config: `table`
and `default_table`.
##########
extension/beats/doris/client.go:
##########
@@ -39,14 +41,14 @@ import (
)
type client struct {
- url string
+ dbURL string
Review Comment:
Is dbURL the same as database? If yes, just remove it.
##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
} else {
- client.observer.Failed(length)
batch.RetryEvents(rest)
- client.logger.Warnf("Retry send %d events", length)
+ client.logger.Warnf("Retry send %d events", len(rest))
}
return err
}
-func (client *client) publishEvents(lable string, events []publisher.Event)
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+ Label string
+ Events []publisher.Event
+
+ // used in publishEvents
+ serialization string
+ dropped int64
+ request *http.Request
+ response *http.Response
+ err error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events
[]publisher.Event) map[string]*Events {
+ tableEventsMap := make(map[string]*Events)
+ if len(events) == 0 {
+ return tableEventsMap
+ }
+
+ barrier, err := getBarrierFromEvent(&events[len(events)-1])
+ if err == nil { // retry
+ if client.tableSelector.Sel.IsConst() { // table is const
+ removeBarrierFromEvent(&events[len(events)-1])
+ tableEventsMap[barrier.Table] = &Events{
+ Label: barrier.Label,
+ Events: events,
+ }
+ } else { // split events by barrier
+ for end := len(events); end > 0; {
+ barrier, _ :=
getBarrierFromEvent(&events[end-1])
+ removeBarrierFromEvent(&events[end-1])
+ start := end - barrier.Length
+
+ tableEventsMap[barrier.Table] = &Events{
+ Label: barrier.Label,
+ Events: events[start:end], // should
not do any append to the array, because here is a slice of the original array
+ }
+
+ end = start
+ }
+ }
+ } else { // first time
+ if client.tableSelector.Sel.IsConst() { // table is const
+ table, _ :=
client.tableSelector.Sel.Select(&events[0].Content)
+ label := client.label(table)
+ tableEventsMap[table] = &Events{
+ Label: label,
+ Events: events,
+ }
+ } else { // select table for each event
+ for _, e := range events {
+ table, err :=
client.tableSelector.Sel.Select(&e.Content)
+ if err != nil {
+ client.logger.Errorf("Failed to select
table: %+v", err)
+ table = nilTable
+ }
+ _, ok := tableEventsMap[table]
+ if !ok {
+ tableEventsMap[table] = &Events{
+ Label: client.label(table),
+ Events: []publisher.Event{e},
+ }
+ } else {
+ tableEventsMap[table].Events =
append(tableEventsMap[table].Events, e)
+ }
+ }
+ }
+ }
+
+ return tableEventsMap
+}
+
+func (client *client) publishEvents(tableEventsMap map[string]*Events)
([]publisher.Event, error) {
begin := time.Now()
- var logFirstEvent []byte
- var stringBuilder strings.Builder
+ for table, tableEvents := range tableEventsMap {
+ events := tableEvents.Events
+
+ if table == nilTable {
+ client.logger.Errorf("Invalid table for %v events",
len(events))
+ tableEvents.dropped = int64(len(events))
+ tableEvents.err = fmt.Errorf("invalid table for %v
events", len(events))
+ continue
+ }
- dropped := 0
- for i := range events {
- event := &events[i]
- serializedEvent, err := client.codec.Encode(client.beat.Beat,
&event.Content)
+ var stringBuilder strings.Builder
- if err != nil {
- if event.Guaranteed() {
- client.logger.Errorf("Failed to serialize the
event: %+v", err)
- } else {
- client.logger.Warnf("Failed to serialize the
event: %+v", err)
+ for i := range events {
+ event := &events[i]
+ serializedEvent, err :=
client.codec.Encode(client.beat.Beat, &event.Content)
+
+ if err != nil {
+ if event.Guaranteed() {
+ client.logger.Errorf("Failed to
serialize the event: %+v", err)
+ } else {
+ client.logger.Warnf("Failed to
serialize the event: %+v", err)
+ }
+ client.logger.Debugf("Failed event: %v", event)
+
+ tableEvents.dropped++
+ continue
}
- client.logger.Debugf("Failed event: %v", event)
- dropped++
- client.reporter.IncrFailedRows(1)
+ stringBuilder.Write(serializedEvent)
+ stringBuilder.WriteString(client.lineDelimiter)
+ }
+
+ tableEvents.serialization = stringBuilder.String()
+
+ var requestErr error
+ tableEvents.request, requestErr =
http.NewRequest(http.MethodPut, client.url(table),
strings.NewReader(tableEvents.serialization))
+ if requestErr != nil {
+ client.logger.Errorf("Failed to create request: %v",
requestErr)
continue
}
- if logFirstEvent == nil {
- logFirstEvent = serializedEvent
+ var groupCommit bool = false
+ for k, v := range client.headers {
+ tableEvents.request.Header.Set(k, v)
+ if k == "group_commit" && v != "off_mode" {
+ groupCommit = true
+ }
+ }
+ if !groupCommit {
+ tableEvents.request.Header.Set("label",
tableEvents.Label)
}
- stringBuilder.Write(serializedEvent)
- stringBuilder.WriteString(client.lineDelimiter)
- }
- request, requestErr := http.NewRequest(http.MethodPut, client.url,
strings.NewReader(stringBuilder.String()))
- if requestErr != nil {
- client.logger.Errorf("Failed to create request: %s", requestErr)
- return events, requestErr
}
- var groupCommit bool = false
- for k, v := range client.headers {
- request.Header.Set(k, v)
- if k == "group_commit" && v != "off_mode" {
- groupCommit = true
+ wg := sync.WaitGroup{}
+ for _, tableEvents := range tableEventsMap {
+ request := tableEvents.request
+ if request != nil {
+ wg.Add(1)
+ go func(e *Events) {
+ e.response, e.err =
client.httpClient.Do(request)
+ wg.Done()
+ }(tableEvents)
Review Comment:
What's the argument `tableEvents` used for?
##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
Review Comment:
Why delete this log?
##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
} else {
- client.observer.Failed(length)
batch.RetryEvents(rest)
- client.logger.Warnf("Retry send %d events", length)
+ client.logger.Warnf("Retry send %d events", len(rest))
}
return err
}
-func (client *client) publishEvents(lable string, events []publisher.Event)
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+ Label string
+ Events []publisher.Event
+
+ // used in publishEvents
+ serialization string
+ dropped int64
+ request *http.Request
+ response *http.Response
+ err error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events
[]publisher.Event) map[string]*Events {
+ tableEventsMap := make(map[string]*Events)
+ if len(events) == 0 {
+ return tableEventsMap
+ }
+
+ barrier, err := getBarrierFromEvent(&events[len(events)-1])
+ if err == nil { // retry
+ if client.tableSelector.Sel.IsConst() { // table is const
+ removeBarrierFromEvent(&events[len(events)-1])
+ tableEventsMap[barrier.Table] = &Events{
+ Label: barrier.Label,
+ Events: events,
+ }
+ } else { // split events by barrier
+ for end := len(events); end > 0; {
+ barrier, _ :=
getBarrierFromEvent(&events[end-1])
+ removeBarrierFromEvent(&events[end-1])
+ start := end - barrier.Length
+
+ tableEventsMap[barrier.Table] = &Events{
+ Label: barrier.Label,
+ Events: events[start:end], // should
not do any append to the array, because here is a slice of the original array
+ }
+
+ end = start
+ }
+ }
+ } else { // first time
+ if client.tableSelector.Sel.IsConst() { // table is const
+ table, _ :=
client.tableSelector.Sel.Select(&events[0].Content)
+ label := client.label(table)
+ tableEventsMap[table] = &Events{
+ Label: label,
+ Events: events,
+ }
+ } else { // select table for each event
+ for _, e := range events {
+ table, err :=
client.tableSelector.Sel.Select(&e.Content)
+ if err != nil {
+ client.logger.Errorf("Failed to select
table: %+v", err)
+ table = nilTable
+ }
+ _, ok := tableEventsMap[table]
+ if !ok {
+ tableEventsMap[table] = &Events{
+ Label: client.label(table),
+ Events: []publisher.Event{e},
+ }
+ } else {
+ tableEventsMap[table].Events =
append(tableEventsMap[table].Events, e)
+ }
+ }
+ }
+ }
+
+ return tableEventsMap
+}
+
+func (client *client) publishEvents(tableEventsMap map[string]*Events)
([]publisher.Event, error) {
begin := time.Now()
- var logFirstEvent []byte
- var stringBuilder strings.Builder
+ for table, tableEvents := range tableEventsMap {
+ events := tableEvents.Events
+
+ if table == nilTable {
+ client.logger.Errorf("Invalid table for %v events",
len(events))
+ tableEvents.dropped = int64(len(events))
+ tableEvents.err = fmt.Errorf("invalid table for %v
events", len(events))
+ continue
+ }
- dropped := 0
- for i := range events {
- event := &events[i]
- serializedEvent, err := client.codec.Encode(client.beat.Beat,
&event.Content)
+ var stringBuilder strings.Builder
- if err != nil {
- if event.Guaranteed() {
- client.logger.Errorf("Failed to serialize the
event: %+v", err)
- } else {
- client.logger.Warnf("Failed to serialize the
event: %+v", err)
+ for i := range events {
+ event := &events[i]
+ serializedEvent, err :=
client.codec.Encode(client.beat.Beat, &event.Content)
+
+ if err != nil {
+ if event.Guaranteed() {
+ client.logger.Errorf("Failed to
serialize the event: %+v", err)
+ } else {
+ client.logger.Warnf("Failed to
serialize the event: %+v", err)
+ }
+ client.logger.Debugf("Failed event: %v", event)
+
+ tableEvents.dropped++
+ continue
}
- client.logger.Debugf("Failed event: %v", event)
- dropped++
- client.reporter.IncrFailedRows(1)
+ stringBuilder.Write(serializedEvent)
+ stringBuilder.WriteString(client.lineDelimiter)
+ }
+
+ tableEvents.serialization = stringBuilder.String()
+
+ var requestErr error
+ tableEvents.request, requestErr =
http.NewRequest(http.MethodPut, client.url(table),
strings.NewReader(tableEvents.serialization))
+ if requestErr != nil {
+ client.logger.Errorf("Failed to create request: %v",
requestErr)
continue
}
- if logFirstEvent == nil {
- logFirstEvent = serializedEvent
+ var groupCommit bool = false
+ for k, v := range client.headers {
+ tableEvents.request.Header.Set(k, v)
+ if k == "group_commit" && v != "off_mode" {
+ groupCommit = true
+ }
+ }
+ if !groupCommit {
+ tableEvents.request.Header.Set("label",
tableEvents.Label)
}
- stringBuilder.Write(serializedEvent)
- stringBuilder.WriteString(client.lineDelimiter)
- }
- request, requestErr := http.NewRequest(http.MethodPut, client.url,
strings.NewReader(stringBuilder.String()))
- if requestErr != nil {
- client.logger.Errorf("Failed to create request: %s", requestErr)
- return events, requestErr
}
- var groupCommit bool = false
- for k, v := range client.headers {
- request.Header.Set(k, v)
- if k == "group_commit" && v != "off_mode" {
- groupCommit = true
+ wg := sync.WaitGroup{}
+ for _, tableEvents := range tableEventsMap {
+ request := tableEvents.request
+ if request != nil {
+ wg.Add(1)
+ go func(e *Events) {
+ e.response, e.err =
client.httpClient.Do(request)
+ wg.Done()
+ }(tableEvents)
}
}
- if !groupCommit {
- request.Header.Set("label", lable)
- }
+ wg.Wait()
- response, responseErr := client.httpClient.Do(request)
- if responseErr != nil {
- client.logger.Errorf("Failed to stream-load request: %v",
responseErr)
- return events, responseErr
- }
+ for table, tableEvents := range tableEventsMap {
+ if table == nilTable {
+ continue
+ }
- defer response.Body.Close()
+ response := tableEvents.response
- responseBytes, responseErr := httputil.DumpResponse(response, true)
- if responseErr != nil {
- client.logger.Errorf("Failed to dump doris stream load
response: %v, error: %v", response, responseErr)
- return events, responseErr
- }
+ if tableEvents.err != nil {
+ client.logger.Errorf("Failed to stream-load request:
%v", tableEvents.err)
+ continue
+ }
- if client.logRequest {
- client.logger.Infof("doris stream load response response:\n%s",
string(responseBytes))
- }
+ defer response.Body.Close()
- body, bodyErr := ioutil.ReadAll(response.Body)
- if bodyErr != nil {
- client.logger.Errorf("Failed to read doris stream load response
body, error: %v, response:\n%v", bodyErr, string(responseBytes))
- return events, bodyErr
- }
+ var responseBytes []byte
+ responseBytes, tableEvents.err =
httputil.DumpResponse(response, true)
+ if tableEvents.err != nil {
+ client.logger.Errorf("Failed to dump doris stream load
response: %v, error: %v", response, tableEvents.err)
+ continue
+ }
+
+ if client.logRequest {
+ client.logger.Infof("doris stream load response
response:\n%s", string(responseBytes))
+ }
+
+ var body []byte
+ body, tableEvents.err = ioutil.ReadAll(response.Body)
+ if tableEvents.err != nil {
+ client.logger.Errorf("Failed to read doris stream load
response body, error: %v, response:\n%v", tableEvents.err,
string(responseBytes))
+ continue
+ }
+
+ var status ResponseStatus
+ tableEvents.err = json.Unmarshal(body, &status)
+ if tableEvents.err != nil {
+ client.logger.Errorf("Failed to parse doris stream load
response to JSON, error: %v, response:\n%v", tableEvents.err,
string(responseBytes))
+ continue
+ }
- var status ResponseStatus
- parseErr := json.Unmarshal(body, &status)
- if parseErr != nil {
- client.logger.Errorf("Failed to parse doris stream load
response to JSON, error: %v, response:\n%v", parseErr, string(responseBytes))
- return events, parseErr
+ if status.Status != "Success" && status.Status != "Publish
Timeout" && status.Status != "Label Already Exists" {
+ client.logger.Errorf("doris stream load status: '%v' is
not 'Success', full response: %v", status.Status, string(responseBytes))
+ tableEvents.err = errors.New("doris stream load status:
" + status.Status)
+ continue
+ }
+
+ if status.Status == "Label Already Exists" {
+ client.logger.Warnf("doris stream load status: '%v', %v
events skipped", status.Status,
int64(len(tableEvents.Events))-tableEvents.dropped)
+ }
}
- if status.Status != "Success" {
- client.logger.Errorf("doris stream load status: '%v' is not
'Success', full response: %v", status.Status, string(responseBytes))
- return events, &status
+ var errs error
+ var retryEvents []publisher.Event
+ var retryRows int64 = 0
+ var droppedRows int64 = 0
+ var successRows int64 = 0
+ var successBytes int64 = 0
+
+ for table, tableEvents := range tableEventsMap {
+ if table == nilTable {
+ errs = errors.Join(errs, tableEvents.err)
+ droppedRows += tableEvents.dropped
+ continue
+ }
+
+ if tableEvents.err != nil {
+ errs = errors.Join(errs, tableEvents.err)
+ retryRows += int64(len(tableEvents.Events))
+ addBarrier(table, tableEvents)
+ retryEvents = append(retryEvents, tableEvents.Events...)
+ continue
+ }
+
+ droppedRows += tableEvents.dropped
+ successRows += int64(len(tableEvents.Events)) -
tableEvents.dropped
+ successBytes += int64(len(tableEvents.serialization))
}
- client.logger.Debugf("Stream-Load publish events: %d events have been
published to doris in %v.",
- len(events)-dropped,
- time.Now().Sub(begin))
+ client.logger.Debugf("Stream-Load publish events: %d events have been
published to doris in %v.", successRows, time.Since(begin))
+
+ client.observer.Dropped(int(droppedRows))
+ client.observer.Acked(int(successRows))
+ client.observer.Failed(int(retryRows))
+
+ client.reporter.IncrTotalBytes(successBytes)
+ client.reporter.IncrTotalRows(successRows)
+
+ return retryEvents, errs
+}
+
+const barrierKey = "__#BARRIER#__"
- client.observer.Dropped(dropped)
- client.observer.Acked(len(events) - dropped)
+type barrierT struct {
+ Table string `json:"table"`
+ Label string `json:"label"`
+ Length int `json:"length"`
+}
- client.reporter.IncrTotalBytes(int64(stringBuilder.Len()))
- client.reporter.IncrTotalRows(int64(len(events) - dropped))
+func addBarrier(table string, events *Events) {
+ events.Events[len(events.Events)-1].Content.Fields[barrierKey] =
&barrierT{
Review Comment:
I think it's more intuitive to add barrier field in the first event.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]