This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 538df28  [improvement](routine-load) Support routine load task succeed 
with empty data consumed (#8256)
538df28 is described below

commit 538df287372f42b69a01bf44e765477e55c5409b
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Thu Mar 3 22:35:50 2022 +0800

    [improvement](routine-load) Support routine load task succeed with empty 
data consumed (#8256)
---
 be/src/runtime/routine_load/data_consumer_group.cpp | 17 ++++-------------
 1 file changed, 4 insertions(+), 13 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 9096e15..5f6c789 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -146,19 +146,10 @@ Status 
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
                 return result_st;
             }
 
-            if (left_bytes == ctx->max_batch_size) {
-                // nothing to be consumed, we have to cancel it, because
-                // we do not allow finishing stream load pipe without data
-                kafka_pipe->cancel("no data");
-                return Status::Cancelled("Cancelled");
-            } else {
-                DCHECK(left_bytes < ctx->max_batch_size);
-                DCHECK(left_rows < ctx->max_batch_rows);
-                kafka_pipe->finish();
-                ctx->kafka_info->cmt_offset = std::move(cmt_offset);
-                ctx->receive_bytes = ctx->max_batch_size - left_bytes;
-                return Status::OK();
-            }
+            kafka_pipe->finish();
+            ctx->kafka_info->cmt_offset = std::move(cmt_offset);
+            ctx->receive_bytes = ctx->max_batch_size - left_bytes;
+            return Status::OK();
         }
 
         RdKafka::Message* msg;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to