sollhui commented on code in PR #51989:
URL: https://github.com/apache/doris/pull/51989#discussion_r2160625008
##########
be/src/vec/sink/writer/vtablet_writer_v2.cpp:
##########
@@ -705,32 +705,87 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}
-Status VTabletWriterV2::_close_wait(bool incremental) {
+std::unordered_set<std::shared_ptr<LoadStreamStub>>
VTabletWriterV2::_incremental_streams() {
+ std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams;
+ auto streams_for_node = _load_stream_map->get_streams_for_node();
+ for (const auto& [dst_id, streams] : streams_for_node) {
+ for (const auto& stream : streams->streams()) {
+ if (stream->is_incremental()) {
+ incremental_streams.insert(stream);
+ }
+ }
+ }
+ return incremental_streams;
+}
+
+std::unordered_set<std::shared_ptr<LoadStreamStub>>
VTabletWriterV2::_non_incremental_streams() {
+ std::unordered_set<std::shared_ptr<LoadStreamStub>>
non_incremental_streams;
+ auto streams_for_node = _load_stream_map->get_streams_for_node();
+ for (const auto& [dst_id, streams] : streams_for_node) {
+ for (const auto& stream : streams->streams()) {
+ if (!stream->is_incremental()) {
+ non_incremental_streams.insert(stream);
+ }
+ }
+ }
+ return non_incremental_streams;
+}
+
+Status VTabletWriterV2::_close_wait(
+ std::unordered_set<std::shared_ptr<LoadStreamStub>>
unfinished_streams) {
SCOPED_TIMER(_close_load_timer);
- auto st = _load_stream_map->for_each_st(
- [this, incremental](int64_t dst_id, LoadStreamStubs& streams) ->
Status {
- if (streams.is_incremental() != incremental) {
- return Status::OK();
- }
- int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
- _timeout_watch.elapsed_time() / 1000 /
1000;
- DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", {
remain_ms = 0; });
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close waiting,
load_id="
- << print_id(_load_id);
- return Status::TimedOut("load timed out before close
waiting");
- }
- auto st = streams.close_wait(_state, remain_ms);
- if (!st.ok()) {
- LOG(WARNING) << "close_wait timeout on streams to dst_id="
<< dst_id
- << ", load_id=" << print_id(_load_id) << ": "
<< st;
- }
- return st;
- });
- if (!st.ok()) {
- LOG(WARNING) << "close_wait failed: " << st << ", load_id=" <<
print_id(_load_id);
+ Status status;
+ auto streams_for_node = _load_stream_map->get_streams_for_node();
+ while (true) {
+ RETURN_IF_ERROR(_check_timeout());
+ RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status,
streams_for_node));
Review Comment:
Yes, the previous logic was the same.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]