This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/develop-cpp by this push:
     new bb533223 feat: revamp TelemetryBidiReactor states and their transition 
graph
bb533223 is described below

commit bb533223b8c5db8990c262bb63f831470fedd480
Author: Li Zhanhui <lizhan...@gmail.com>
AuthorDate: Tue Mar 26 11:59:55 2024 +0800

    feat: revamp TelemetryBidiReactor states and their transition graph
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 cpp/source/client/SessionImpl.cpp                |   6 +-
 cpp/source/client/TelemetryBidiReactor.cpp       | 331 +++++++++++++++--------
 cpp/source/client/include/TelemetryBidiReactor.h |  61 ++---
 3 files changed, 244 insertions(+), 154 deletions(-)

diff --git a/cpp/source/client/SessionImpl.cpp 
b/cpp/source/client/SessionImpl.cpp
index 0ca3fff2..36416829 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -16,12 +16,14 @@
  */
 
 #include "SessionImpl.h"
+
 #include "rocketmq/Logger.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-SessionImpl::SessionImpl(std::weak_ptr<Client> client, 
std::shared_ptr<RpcClient> rpc_client) : client_(client), 
rpc_client_(rpc_client) {
+SessionImpl::SessionImpl(std::weak_ptr<Client> client, 
std::shared_ptr<RpcClient> rpc_client)
+    : client_(client), rpc_client_(rpc_client) {
   telemetry_ = rpc_client->asyncTelemetry(client_);
   syncSettings();
 }
@@ -39,8 +41,8 @@ void SessionImpl::syncSettings() {
 }
 
 SessionImpl::~SessionImpl() {
-  SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
   telemetry_->fireClose();
+  SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index d45e5058..61796437 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -35,7 +35,10 @@ ROCKETMQ_NAMESPACE_BEGIN
 TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
                                            rmq::MessagingService::Stub* stub,
                                            std::string peer_address)
-    : client_(client), peer_address_(std::move(peer_address)), 
stream_state_(StreamState::Active) {
+    : client_(client),
+      peer_address_(std::move(peer_address)),
+      read_state_(StreamState::Created),
+      write_state_(StreamState::Created) {
   auto ptr = client_.lock();
   auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
   context_.set_deadline(deadline);
@@ -45,59 +48,67 @@ 
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
     context_.AddMetadata(entry.first, entry.second);
   }
   stub->async()->Telemetry(&context_, this);
+  write_state_ = StreamState::Ready;
   // Increase hold for write stream.
   AddHold();
   StartCall();
 }
 
 TelemetryBidiReactor::~TelemetryBidiReactor() {
-  SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", 
peer_address_,
-              static_cast<std::uint8_t>(stream_state_));
+  SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={}, 
WriteStreamState={}", peer_address_,
+              static_cast<std::uint8_t>(read_state_), 
static_cast<std::uint8_t>(read_state_));
 }
 
 bool TelemetryBidiReactor::await() {
-  absl::MutexLock lk(&server_setting_received_mtx_);
-  if (server_setting_received_) {
+  absl::MutexLock lk(&state_mtx_);
+  if (StreamState::Created != write_state_) {
     return true;
   }
 
-  server_setting_received_cv_.Wait(&server_setting_received_mtx_);
-  return server_setting_received_;
-}
-
-void TelemetryBidiReactor::changeStreamStateThenNotify(StreamState state) {
-  absl::MutexLock lk(&stream_state_mtx_);
-  if (state == stream_state_) {
-    return;
-  }
-  stream_state_ = state;
-  stream_state_cv_.SignalAll();
+  state_cv_.Wait(&state_mtx_);
+  return StreamState::Error != write_state_;
 }
 
 void TelemetryBidiReactor::OnWriteDone(bool ok) {
   SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
-  {
-    bool expected = true;
-    if (!command_inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
-      SPDLOG_WARN("Illegal command-inflight state");
-      return;
-    }
-  }
 
   if (!ok) {
     RemoveHold();
     {
-      absl::MutexLock lk(&writes_mtx_);
+      absl::MutexLock lk(&state_mtx_);
       SPDLOG_WARN("Failed to write telemetry command {} to {}", 
writes_.front().DebugString(), peer_address_);
-    }
-
-    {
-      absl::MutexLock lk(&stream_state_mtx_);
-      if (streamStateGood()) {
-        stream_state_ = StreamState::WriteFailure;
+      write_state_ = StreamState::Error;
+
+      // Sync read state.
+      switch (read_state_) {
+        case StreamState::Created:
+        case StreamState::Ready: {
+          SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                       static_cast<std::uint8_t>(StreamState::Closed));
+          read_state_ = StreamState::Closed;
+          break;
+        }
+        case StreamState::Inflight: {
+          SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                       static_cast<std::uint8_t>(StreamState::Closing));
+          read_state_ = StreamState::Closing;
+          break;
+        }
+        case StreamState::Closing:
+        case StreamState::Error:
+        case StreamState::Closed: {
+          break;
+        }
       }
+
+      state_cv_.SignalAll();
     }
     return;
+  } else {
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Inflight == write_state_) {
+      write_state_ = StreamState::Ready;
+    }
   }
 
   // Check if the read stream has started.
@@ -114,22 +125,49 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
 
 void TelemetryBidiReactor::OnReadDone(bool ok) {
   SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
-  if (!ok) {
-    RemoveHold();
-    {
-      absl::MutexLock lk(&stream_state_mtx_);
-      if (streamStateGood() && stream_state_ != StreamState::Closing) {
-        stream_state_ = StreamState::ReadFailure;
+  {
+    absl::MutexLock lk(&state_mtx_);
+    if (!ok) {
+      // Remove read hold.
+      RemoveHold();
+      {
+        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Error));
+        read_state_ = StreamState::Error;
         SPDLOG_WARN("Faild to read from telemetry stream from {}", 
peer_address_);
-      }
 
-      bool expected = false;
-      if (command_inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
-        // There is no inflight write request, remove write hold on its behalf.
-        RemoveHold();
+        // Sync write state
+        switch (write_state_) {
+          case StreamState::Created: {
+            // Not reachable
+            break;
+          }
+          case StreamState::Ready: {
+            write_state_ = StreamState::Closed;
+            // There is no inflight write request, remove write hold on its 
behalf.
+            RemoveHold();
+            state_cv_.SignalAll();
+            break;
+          }
+          case StreamState::Inflight: {
+            write_state_ = StreamState::Closing;
+            break;
+          }
+          case StreamState::Closing:
+          case StreamState::Error:
+          case StreamState::Closed: {
+            break;
+          }
+        }
       }
+      return;
+    } else if (StreamState::Closing == read_state_) {
+      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      read_state_ = StreamState::Closed;
+      state_cv_.SignalAll();
+      return;
     }
-    return;
   }
 
   SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, 
read_.DebugString());
@@ -144,13 +182,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
       auto settings = read_.settings();
       SPDLOG_INFO("Received settings from {}: {}", peer_address_, 
settings.DebugString());
       applySettings(settings);
-      {
-        absl::MutexLock lk(&server_setting_received_mtx_);
-        if (!server_setting_received_) {
-          server_setting_received_ = true;
-          server_setting_received_cv_.SignalAll();
-        }
-      }
       break;
     }
     case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
@@ -206,7 +237,18 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
     }
   }
 
-  StartRead(&read_);
+  {
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Inflight == read_state_) {
+      SPDLOG_DEBUG("Spawn new read op, read-state={}", 
static_cast<std::uint8_t>(read_state_));
+      StartRead(&read_);
+    } else if (read_state_ == StreamState::Closing) {
+      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      read_state_ = StreamState::Closed;
+      state_cv_.SignalAll();
+    }
+  }
 }
 
 void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
@@ -300,30 +342,35 @@ void TelemetryBidiReactor::applySubscriptionConfig(const 
rmq::Settings& settings
 }
 
 void TelemetryBidiReactor::fireRead() {
-  {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (!streamStateGood()) {
-      SPDLOG_WARN("Further read from {} is not allowded due to 
stream-state={}", peer_address_,
-                  static_cast<std::uint8_t>(stream_state_));
-      return;
-    }
-  }
-
-  bool expected = false;
-  if (read_stream_started_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
-    // Hold for the read stream.
-    AddHold();
-    StartRead(&read_);
+  absl::MutexLock lk(&state_mtx_);
+  if (StreamState::Created != read_state_) {
+    SPDLOG_DEBUG("Further read from {} is not allowded due to 
stream-state={}", peer_address_,
+                 static_cast<std::uint8_t>(read_state_));
+    return;
   }
+  SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+               static_cast<std::uint8_t>(StreamState::Ready));
+  read_state_ = StreamState::Ready;
+  AddHold();
+  SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+               static_cast<std::uint8_t>(StreamState::Inflight));
+  read_state_ = StreamState::Inflight;
+  StartRead(&read_);
 }
 
 void TelemetryBidiReactor::write(TelemetryCommand command) {
   SPDLOG_DEBUG("{}#write", peer_address_);
   {
-    absl::MutexLock lk(&stream_state_mtx_);
+    absl::MutexLock lk(&state_mtx_);
     // Reject incoming write commands if the stream state is closing or has 
witnessed some error.
-    if (!streamStateGood() || StreamState::Closing == stream_state_) {
-      return;
+    switch (write_state_) {
+      case StreamState::Closing:
+      case StreamState::Error:
+      case StreamState::Closed:
+        return;
+      default:
+        // no-op
+        break;
     }
   }
 
@@ -337,84 +384,123 @@ void TelemetryBidiReactor::write(TelemetryCommand 
command) {
 void TelemetryBidiReactor::tryWriteNext() {
   SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
   {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (!streamStateGood()) {
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Error == write_state_ || StreamState::Closed == 
write_state_) {
       SPDLOG_WARN("Further write to {} is not allowded due to 
stream-state={}", peer_address_,
-                  static_cast<std::uint8_t>(stream_state_));
+                  static_cast<std::uint8_t>(write_state_));
       return;
     }
   }
 
-  bool closing = false;
-  {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (StreamState::Closing == stream_state_) {
-      closing = true;
-    }
-  }
-
   {
     absl::MutexLock lk(&writes_mtx_);
-    if (writes_.empty() && !closing) {
+    if (writes_.empty() && StreamState::Closing != write_state_) {
       SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
       return;
     }
 
-    bool expected = false;
-    if (command_inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
-      if (writes_.empty()) {
-        // Tell server there is no more write requests.
-        StartWritesDone();
-      } else {
-        SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().DebugString());
-        StartWrite(&(writes_.front()));
-      }
+    if (StreamState::Ready == write_state_) {
+      write_state_ = StreamState::Inflight;
+    }
+
+    if (writes_.empty()) {
+      // Tell server there is no more write requests.
+      StartWritesDone();
     } else {
-      SPDLOG_DEBUG("Another command is already on the wire. Peer={}", 
peer_address_);
-      return;
+      SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().DebugString());
+      StartWrite(&(writes_.front()));
     }
   }
 }
 
 void TelemetryBidiReactor::fireClose() {
   SPDLOG_INFO("{}#fireClose", peer_address_);
+
   {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (!streamStateGood()) {
-      SPDLOG_WARN("No futher Read/Write call to {} is allowed due to 
stream-state={}", peer_address_,
-                  static_cast<std::uint8_t>(stream_state_));
-      return;
+    // Acquire state lock
+    absl::MutexLock lk(&state_mtx_);
+
+    // Transition read state
+    switch (read_state_) {
+      case StreamState::Created:
+      case StreamState::Ready: {
+        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Closed));
+        read_state_ = StreamState::Closed;
+        state_cv_.SignalAll();
+        break;
+      }
+
+      case StreamState::Inflight: {
+        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Closing));
+        read_state_ = StreamState::Closing;
+        break;
+      }
+      case StreamState::Closing: {
+        break;
+      }
+      case StreamState::Closed:
+      case StreamState::Error: {
+        state_cv_.SignalAll();
+        break;
+      }
+    }
+
+    // Transition write state
+    switch (write_state_) {
+      case StreamState::Created:
+      case StreamState::Ready:
+      case StreamState::Inflight: {
+        SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Closing));
+        write_state_ = StreamState::Closing;
+        break;
+      }
+      case StreamState::Closing: {
+        break;
+      }
+      case StreamState::Closed:
+      case StreamState::Error: {
+        state_cv_.SignalAll();
+        break;
+      }
     }
   }
 
-  {
-    absl::MutexLock lk(&stream_state_mtx_);
-    stream_state_ = StreamState::Closing;
+  if (StreamState::Closing == write_state_) {
+    tryWriteNext();
   }
-  tryWriteNext();
+
   {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (stream_state_cv_.WaitWithTimeout(&stream_state_mtx_, 
absl::Seconds(3))) {
-      SPDLOG_WARN("StreamState CondVar timed out before getting signalled");
+    // Acquire state lock
+    absl::MutexLock lk(&state_mtx_);
+    while ((StreamState::Closed != read_state_ && StreamState::Error != 
read_state_) ||
+           (StreamState::Closed != write_state_ && StreamState::Error != 
write_state_)) {
+      if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
+        SPDLOG_WARN("StreamState CondVar timed out before getting signalled: 
read-state={}, write-state={}",
+                    static_cast<uint8_t>(read_state_), 
static_cast<uint8_t>(write_state_));
+      }
     }
   }
 }
 
 void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
   SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+  assert(StreamState::Closing == write_state_);
+
+  absl::MutexLock lk(&state_mtx_);
   // Remove the hold for the write stream.
   RemoveHold();
 
   if (!ok) {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (streamStateGood()) {
-      stream_state_ = StreamState::WriteFailure;
-    }
+    write_state_ = StreamState::Error;
     SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
-    return;
+  } else {
+    write_state_ = StreamState::Closed;
+    SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
   }
-
-  SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+  state_cv_.SignalAll();
 }
 
 void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
@@ -441,7 +527,18 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
 
   {
     SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
-    changeStreamStateThenNotify(StreamState::Closed);
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Error != read_state_) {
+      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      read_state_ = StreamState::Closed;
+    }
+    if (StreamState::Error != write_state_) {
+      SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      write_state_ = StreamState::Closed;
+    }
+    state_cv_.SignalAll();
   }
 
   auto client = client_.lock();
@@ -456,11 +553,13 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
 
 void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
   SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
+
   if (!ok) {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (streamStateGood()) {
-      stream_state_ = StreamState::ReadInitialMetadataFailure;
-    }
+    absl::MutexLock lk(&state_mtx_);
+    SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                 static_cast<std::uint8_t>(StreamState::Error));
+    read_state_ = StreamState::Error;
+    state_cv_.SignalAll();
     SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
     return;
   }
@@ -468,8 +567,4 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool 
ok) {
   SPDLOG_DEBUG("Received initial metadata from {}", peer_address_);
 }
 
-bool TelemetryBidiReactor::streamStateGood() {
-  return StreamState::Active == stream_state_ || StreamState::Closing == 
stream_state_;
-}
-
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index 63c6f5e6..77ee484f 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -34,21 +34,30 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 enum class StreamState : std::uint8_t
 {
-  Active = 0,
-
-  /// Once the stream enters this state, new write requests shall be rejected
-  /// and once currently pending requests are written, write stream should be
-  /// closed as soon as possible.
-  Closing = 1,
-
-  // Once stream state reaches one of the following, Start* should not be
-  // called.
-  Closed = 2,
-  ReadInitialMetadataFailure = 3,
-  ReadFailure = 4,
-  WriteFailure = 5,
+  Created = 0,
+  Ready = 1,
+  Inflight = 2,
+  Closing = 3,
+  Closed = 4,
+  Error = 5,
 };
 
+/// write-stream-state: created --> ready --> inflight --> ready --> ...
+///                                                    --> error
+///                                                    --> closing --> closed
+///                                       --> closing  --> closed
+///                                                    --> error
+///
+///
+/// read-stream-state: created --> ready --> inflight --> inflight
+///                                                   --> closing --> closed
+///                                                   --> error
+///                                      --> closed
+/// requirement:
+///    1, fireClose --> blocking await till bidireactor is closed;
+///    2, when session is closed and client is still active, recreate a new 
session to accept incoming commands from
+///    server 3, after writing the first Setttings telemetry command, launch 
the read directional stream
+///
 class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, 
TelemetryCommand>,
                              public 
std::enable_shared_from_this<TelemetryBidiReactor> {
 public:
@@ -131,20 +140,10 @@ private:
    */
   std::string peer_address_;
 
-  /**
-   * @brief Indicate if there is a command being written to network.
-   */
-  std::atomic_bool command_inflight_{false};
-
-  std::atomic_bool read_stream_started_{false};
-
-  StreamState stream_state_ GUARDED_BY(stream_state_mtx_);
-  absl::Mutex stream_state_mtx_;
-  absl::CondVar stream_state_cv_;
-
-  bool server_setting_received_ 
GUARDED_BY(server_setting_received_mtx_){false};
-  absl::Mutex server_setting_received_mtx_;
-  absl::CondVar server_setting_received_cv_;
+  StreamState read_state_ GUARDED_BY(state_mtx_);
+  StreamState write_state_ GUARDED_BY(state_mtx_);
+  absl::Mutex state_mtx_;
+  absl::CondVar state_cv_;
 
   void changeStreamStateThenNotify(StreamState state);
 
@@ -158,19 +157,13 @@ private:
 
   void applySubscriptionConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
 
-  /**
-   * Indicate if the underlying gRPC bidirectional stream is good enough to 
fire
-   * further Start* calls.
-   */
-  bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_);
-
   /// Start the read stream.
   ///
   /// Once got the OnReadDone and status is OK, call StartRead immediately.
   void fireRead();
 
   /// Attempt to write pending telemetry command to server.
-  void tryWriteNext();
+  void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file

Reply via email to