This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fe63640cd01 [fix](move-memtable) tolerate non-open streams in close 
wait (#44680)
fe63640cd01 is described below

commit fe63640cd014fc7381dfcf4318a22a4684c954e3
Author: Kaijie Chen <chenkai...@selectdb.com>
AuthorDate: Sat Dec 7 21:57:50 2024 +0800

    [fix](move-memtable) tolerate non-open streams in close wait (#44680)
    
    Related PR: #44344
    `VTabletWriterV2::_select_streams()` is already checking if there is
    enough downstream BE to meet the replication requirements.
    `VTabletWriterV2::close()` should tolerate those non-open streams on
    close wait.
    
    Debug point `VTabletWriterV2._open_streams.skip_two_backends` is added
    along with `VTabletWriterV2._open_streams.skip_one_backend` to check
    this behavior.
---
 be/src/vec/sink/load_stream_stub.cpp                     | 16 ++++++----------
 be/src/vec/sink/writer/vtablet_writer_v2.cpp             | 12 +++++++++---
 .../test_multi_replica_fault_injection.groovy            | 13 ++++++++-----
 3 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 59719765ab3..d5b04c636d4 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -219,11 +219,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, 
int64_t index_id, int64
         add_failed_tablet(tablet_id, _status);
         return _status;
     }
-    DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
-        if (segment_id != 0) {
-            return Status::OK();
-        }
-    });
+    DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); 
});
     PStreamHeader header;
     header.set_src_id(_src_id);
     *header.mutable_load_id() = _load_id;
@@ -246,11 +242,7 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
         add_failed_tablet(tablet_id, _status);
         return _status;
     }
-    DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
-        if (segment_id != 0) {
-            return Status::OK();
-        }
-    });
+    DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); 
});
     PStreamHeader header;
     header.set_src_id(_src_id);
     *header.mutable_load_id() = _load_id;
@@ -340,6 +332,10 @@ Status LoadStreamStub::wait_for_schema(int64_t 
partition_id, int64_t index_id, i
 
 Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
     DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
+    if (!_is_open.load()) {
+        // we don't need to close wait on non-open streams
+        return Status::OK();
+    }
     if (!_is_closing.load()) {
         return _status;
     }
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 3dc58be3bcd..cd196a8f2b3 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -269,14 +269,20 @@ Status VTabletWriterV2::open(RuntimeState* state, 
RuntimeProfile* profile) {
 }
 
 Status VTabletWriterV2::_open_streams() {
-    bool fault_injection_skip_be = true;
+    int fault_injection_skip_be = 0;
     bool any_backend = false;
     bool any_success = false;
     for (auto& [dst_id, _] : _tablets_for_node) {
         auto streams = _load_stream_map->get_or_create(dst_id);
         DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
-            if (fault_injection_skip_be) {
-                fault_injection_skip_be = false;
+            if (fault_injection_skip_be < 1) {
+                fault_injection_skip_be++;
+                continue;
+            }
+        });
+        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
+            if (fault_injection_skip_be < 2) {
+                fault_injection_skip_be++;
                 continue;
             }
         });
diff --git 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
index 2f6afd5ca69..d09983d52d0 100644
--- 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -75,14 +75,15 @@ suite("test_multi_replica_fault_injection", 
"nonConcurrent") {
             file "baseall.txt"
         }
 
-        def load_with_injection = { injection, error_msg->
+        def load_with_injection = { injection, error_msg, success=false->
             try {
                 sql "truncate table test"
                 GetDebugPoint().enableDebugPointForAllBEs(injection)
                 sql "insert into test select * from baseall where k1 <= 3"
+                assertTrue(success, String.format("Expected Exception '%s', 
actual success", error_msg))
             } catch(Exception e) {
                 logger.info(e.getMessage())
-                assertTrue(e.getMessage().contains(error_msg))
+                assertTrue(e.getMessage().contains(error_msg), e.toString())
             } finally {
                 GetDebugPoint().disableDebugPointForAllBEs(injection)
             }
@@ -90,15 +91,17 @@ suite("test_multi_replica_fault_injection", 
"nonConcurrent") {
 
         // StreamSinkFileWriter appendv write segment failed one replica
         // success
-        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 "sucess")
+        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 "sucess", true)
         // StreamSinkFileWriter appendv write segment failed two replica
         
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 "add segment failed")
         // StreamSinkFileWriter appendv write segment failed all replica
         
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 "failed to send segment data to any replicas")
         // test segment num check when LoadStreamStub missed tail segments
-        load_with_injection("LoadStreamStub.only_send_segment_0", "segment num 
mismatch")
+        load_with_injection("LoadStreamStub.skip_send_segment", "segment num 
mismatch")
         // test one backend open failure
-        load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", 
"success")
+        load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", 
"success", true)
+        // test two backend open failure
+        load_with_injection("VTabletWriterV2._open_streams.skip_two_backends", 
"not enough streams 1/3")
         sql """ set enable_memtable_on_sink_node=false """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to