joker-star-l commented on code in PR #47691:
URL: https://github.com/apache/doris/pull/47691#discussion_r1960068923
##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
Review Comment:
duplicated log of
https://github.com/apache/doris/pull/47691/files#diff-86c3e882948c75fd362158ddaec93628d46975812e8db6f8b19bbe76afd5917aR443
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]