lordgamez commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2381549857


##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,180 @@
 #pragma once
 
 #include <algorithm>
-#include <array>
 #include <map>
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
+#include <optional>
 
 #include "Peer.h"
 #include "SiteToSite.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
 
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
 
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+}  // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
  public:
-  DataPacket(std::shared_ptr<core::logging::Logger> logger, 
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> 
attributes, const std::string &payload)
-      : _attributes{std::move(attributes)},
-        transaction_{std::move(transaction)},
-        payload_{payload},
-        logger_reference_{std::move(logger)} {
+  DataPacket(std::shared_ptr<Transaction> transaction, const std::string 
&payload)
+      : transaction{std::move(transaction)},
+        payload{payload} {
+  }
+  DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string, 
std::string> attributes, const std::string &payload)
+      : attributes{std::move(attributes)},
+        transaction{std::move(transaction)},
+        payload{payload} {
   }
-  std::map<std::string, std::string> _attributes;
-  uint64_t _size{0};
-  std::shared_ptr<Transaction> transaction_;
-  const std::string & payload_;
-  std::shared_ptr<core::logging::Logger> logger_reference_;
+  std::map<std::string, std::string> attributes;
+  std::shared_ptr<Transaction> transaction;
+  const std::string& payload;
 };
 
-class SiteToSiteClient : public core::ConnectableImpl {
+struct SiteToSiteResponse {
+  ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+  std::string message;
+};
+
+class SiteToSiteClient {
  public:
-  SiteToSiteClient()
-      : core::ConnectableImpl("SitetoSiteClient") {
+  explicit SiteToSiteClient(gsl::not_null<std::unique_ptr<SiteToSitePeer>> 
peer)
+      : peer_(std::move(peer)) {
   }
 
-  ~SiteToSiteClient() override = default;
+  SiteToSiteClient(const SiteToSiteClient&) = delete;
+  SiteToSiteClient(SiteToSiteClient&&) = delete;
+  SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+  SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
 
-  void setSSLContextService(const 
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
&context_service) {
-    ssl_context_service_ = context_service;
-  }
+  virtual ~SiteToSiteClient() = default;
 
-  /**
-   * Creates a transaction using the transaction ID and the direction
-   * @param transactionID transaction identifier
-   * @param direction direction of transfer
-   */
-  virtual std::shared_ptr<Transaction> createTransaction(TransferDirection 
direction) = 0;
+  virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+  virtual bool transmitPayload(core::ProcessContext& context, const 
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
 
-  /**
-   * Transfers flow files
-   * @param direction transfer direction
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transfer(TransferDirection direction, core::ProcessContext& 
context, core::ProcessSession& session) {
-#ifndef WIN32
-    if (__builtin_expect(direction == SEND, 1)) {
+  bool transfer(TransferDirection direction, core::ProcessContext& context, 
core::ProcessSession& session) {
+    if (direction == TransferDirection::SEND) {
       return transferFlowFiles(context, session);
     } else {
       return receiveFlowFiles(context, session);
     }
-#else
-    if (direction == SEND) {
-      return transferFlowFiles(context, session);
-    } else {
-      return receiveFlowFiles(context, session);
-    }
-#endif
   }
 
-  /**
-   * Transfers flow files to server
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transferFlowFiles(core::ProcessContext& context, 
core::ProcessSession& session);
-
-  /**
-   * Receive flow files from server
-   * @param context process context
-   * @param session process session
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-
-  // Confirm the data that was sent or received by comparing CRC32's of the 
data sent and the data received.
-  // Receive flow files for the process session
-  bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession& 
session);
-
-  // Receive the data packet from the transaction
-  // Return false when any error occurs
-  bool receive(const utils::Identifier &transactionID, DataPacket *packet, 
bool &eof);
-  /**
-   * Transfers raw data and attributes  to server
-   * @param context process context
-   * @param session process session
-   * @param payload data to transmit
-   * @param attributes
-   * @returns true if the process succeeded, failure OR exception thrown 
otherwise
-   */
-  virtual bool transmitPayload(core::ProcessContext& context, 
core::ProcessSession& session, const std::string &payload,
-                               std::map<std::string, std::string> attributes) 
= 0;
-
-  void setPortId(utils::Identifier &id) {
+  void setPortId(const utils::Identifier& id) {
     port_id_ = id;
   }
 
-  /**
-   * Sets the idle timeout.
-   */
   void setIdleTimeout(std::chrono::milliseconds timeout) {
      idle_timeout_ = timeout;
   }
 
-  /**
-   * Sets the base peer for this interface.
-   */
-  virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
-    peer_ = std::move(peer);
-  }
-
-  /**
-   * Provides a reference to the port identifier
-   * @returns port identifier
-   */
-  utils::Identifier getPortId() const {
+  [[nodiscard]] utils::Identifier getPortId() const {
     return port_id_;
   }
 
-  /**
-   * Obtains the peer list and places them into the provided vector
-   * @param peers peer vector.
-   * @return true if successful, false otherwise
-   */
-  virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
-  /**
-   * Establishes the interface.
-   * @return true if successful, false otherwise
-   */
-  virtual bool establish() = 0;
-
-  const std::shared_ptr<core::logging::Logger> &getLogger() {
+  [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
     return logger_;
   }
 
-  void yield() override {
+  void setSSLContextService(const 
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
&context_service) {
+    ssl_context_service_ = context_service;
   }
 
-  /**
-   * Determines if we are connected and operating
-   */
-  bool isRunning() const override {
-    return running_;
+  void setUseCompression(bool use_compression) {
+    use_compression_ = use_compression;
   }
 
-  /**
-   * Determines if work is available by this connectable
-   * @return boolean if work is available.
-   */
-  bool isWorkAvailable() override {
-    return true;
+  void setBatchSize(uint64_t size) {
+    batch_size_ = size;
   }
 
-  virtual bool bootstrap() {
-    return true;
+  void setBatchCount(uint64_t count) {
+    batch_count_ = count;
   }
 
-  // Return -1 when any error occurs
-  virtual int16_t send(const utils::Identifier& transactionID, DataPacket* 
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession* 
session);
+  void setBatchDuration(std::chrono::milliseconds duration) {
+    batch_duration_ = duration;
+  }
 
- protected:
-  // Cancel the transaction
-  virtual void cancel(const utils::Identifier &transactionID);
-  // Complete the transaction
-  virtual bool complete(core::ProcessContext& context, const utils::Identifier 
&transactionID);
-  // Error the transaction
-  virtual void error(const utils::Identifier &transactionID);
+  virtual void setTimeout(std::chrono::milliseconds timeout) {
+    timeout_ = timeout;
+  }
 
-  virtual bool confirm(const utils::Identifier &transactionID);
-  // deleteTransaction
-  virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+  friend class test::SiteToSiteClientTestAccessor;
 
+  virtual bool bootstrap() = 0;
+  virtual bool establish() = 0;
+  virtual std::shared_ptr<Transaction> createTransaction(TransferDirection 
direction) = 0;
   virtual void tearDown() = 0;
 
-  // read Respond
-  virtual int readResponse(const std::shared_ptr<Transaction> &transaction, 
RespondCode &code, std::string &message);
-  // write respond
-  virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, 
RespondCode code, const std::string& message);
-  // getRespondCodeContext
-  virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
-    for (auto & i : SiteToSiteRequest::respondCodeContext) {
-      if (i.code == code) {
-        return &i;
-      }
-    }
-    return nullptr;
-  }
+  virtual void deleteTransaction(const utils::Identifier &transaction_id);
+  virtual std::optional<SiteToSiteResponse> readResponse(const 
std::shared_ptr<Transaction> &transaction);
+  virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction, 
const SiteToSiteResponse& response);
 
-  // Peer State
-  PeerState peer_state_{PeerState::IDLE};
+  bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+  bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>& 
transaction, const std::map<std::string, std::string>& attributes);
+  void finalizeSendTransaction(const std::shared_ptr<Transaction>& 
transaction, uint64_t sent_bytes);
+  bool sendPacket(const DataPacket& packet);
+  bool sendFlowFile(const std::shared_ptr<Transaction>& transaction, 
core::FlowFile& flow_file, core::ProcessSession& session);
 
-  // portId
-  utils::Identifier port_id_;
+  void cancel(const utils::Identifier &transaction_id);
+  bool complete(core::ProcessContext& context, const utils::Identifier 
&transaction_id);
+  void error(const utils::Identifier &transaction_id);
+  bool confirm(const utils::Identifier &transaction_id);
 
-  // idleTimeout
-  std::chrono::milliseconds idle_timeout_{15000};
+  void handleTransactionError(const std::shared_ptr<Transaction>& transaction, 
core::ProcessContext& context, const std::exception& exception);
 
-  // Peer Connection
-  std::unique_ptr<SiteToSitePeer> peer_;
-
-  std::atomic<bool> running_{false};
-
-  // transaction map
+  PeerState peer_state_{PeerState::IDLE};
+  utils::Identifier port_id_;
+  std::chrono::milliseconds idle_timeout_{15s};
+  gsl::not_null<std::unique_ptr<SiteToSitePeer>> peer_;
   std::map<utils::Identifier, std::shared_ptr<Transaction>> 
known_transactions_;
+  std::chrono::nanoseconds batch_send_nanos_{5s};
 
-  // BATCH_SEND_NANOS
-  std::chrono::nanoseconds _batchSendNanos = std::chrono::seconds(5);
-
-  /***
-   * versioning
-   */
-  uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
-  int _currentVersionIndex{0};
-  uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
-  uint32_t _supportedCodecVersion[1] = {1};
-  int _currentCodecVersionIndex{0};
-  uint32_t 
_currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
+  const std::vector<uint32_t> supported_versions_ = {5, 4, 3, 2, 1};
+  uint32_t current_version_index_{0};
+  uint32_t current_version_{supported_versions_[current_version_index_]};
+  const std::vector<uint32_t> supported_codec_versions_ = {1};
+  uint32_t current_codec_version_index_{0};
+  uint32_t 
current_codec_version_{supported_codec_versions_[current_codec_version_index_]};

Review Comment:
   I'm not sure what the original intent was. I think these do not need 
synchronization because these variables are only used in the class internally 
in the initialization of the client, during version negotiation, which is only 
done once and never in parallel. Meanwhile the others below can be changed from 
the outside using the setter functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to