git-hulk commented on code in PR #3075:
URL: https://github.com/apache/kvrocks/pull/3075#discussion_r2247848253


##########
src/cluster/replication.cc:
##########
@@ -203,6 +206,21 @@ void FeedSlaveThread::loop() {
         Stop();
         return;
       }
+
+      // Check if this change would unblock any WAIT command
+      auto largest_unblockable_seq = 
srv_->LargestTargetSeqToWakeup(batch.sequence);
+      if (largest_unblockable_seq > last_replconf_getack_seq_) {
+        // Send _getack to the slave to get acknowledgment
+        auto s = util::SockSend(conn_->GetFD(), redis::BulkString("_getack"), 
conn_->GetBufferEvent());
+        if (!s.IsOK()) {
+          error("Write error while sending _getack to slave: {}", s.Msg());
+          Stop();
+          return;
+        } else {
+          last_replconf_getack_seq_ = largest_unblockable_seq;

Review Comment:
   Should we set `last_replconf_getack_seq_` to `largest_unblockable_seq` when 
the value of `largest_unblockable_seq` is zero?
   
   By the way, do you think if it's good to merge the `_getack` command into 
`batches_bulk`? so that we can reduce one send call like the following:
   
   ```
   bool ReplicationThread::shouldSendGetACK(SequenceNumber seq) {
     auto largest_unblockable_seq = 
srv_->LargestTargetSeqToWakeup(batch.sequence);
     if (largest_unblockable_seq > last_replconf_getack_seq_) {
        return true;
     }
     last_replconf_getack_seq_ = largest_unblockable_seq;
     return false
   }
   
   ...
   
       if (is_first_repl_batch || batches_bulk.size() >= kMaxDelayBytes || 
updates_in_batches >= kMaxDelayUpdates ||
           srv_->storage->LatestSeqNumber() - batch.sequence <= 
kMaxDelayUpdates) {
         if (shouldSendGetACK(batch.sequence)) {
           batches_bulk.emplace_back(redis::BulkString("_getack"))
         }
         // Send entire bulk which contain multiple batches
         auto s = util::SockSend(conn_->GetFD(), batches_bulk, 
conn_->GetBufferEvent());
         if (!s.IsOK()) {
           error("Write error while sending batch to slave: {}. batches: 0x{}", 
s.Msg(), util::StringToHex(batches_bulk));
           Stop();
           return;
         }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to