This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 538df28 [improvement](routine-load) Support routine load task succeed with empty data consumed (#8256) 538df28 is described below commit 538df287372f42b69a01bf44e765477e55c5409b Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Thu Mar 3 22:35:50 2022 +0800 [improvement](routine-load) Support routine load task succeed with empty data consumed (#8256) --- be/src/runtime/routine_load/data_consumer_group.cpp | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 9096e15..5f6c789 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -146,19 +146,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { return result_st; } - if (left_bytes == ctx->max_batch_size) { - // nothing to be consumed, we have to cancel it, because - // we do not allow finishing stream load pipe without data - kafka_pipe->cancel("no data"); - return Status::Cancelled("Cancelled"); - } else { - DCHECK(left_bytes < ctx->max_batch_size); - DCHECK(left_rows < ctx->max_batch_rows); - kafka_pipe->finish(); - ctx->kafka_info->cmt_offset = std::move(cmt_offset); - ctx->receive_bytes = ctx->max_batch_size - left_bytes; - return Status::OK(); - } + kafka_pipe->finish(); + ctx->kafka_info->cmt_offset = std::move(cmt_offset); + ctx->receive_bytes = ctx->max_batch_size - left_bytes; + return Status::OK(); } RdKafka::Message* msg; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org