lordgamez commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2387246992
##########
libminifi/src/sitetosite/SiteToSiteClient.cpp:
##########
@@ -20,735 +20,740 @@
#include <map>
#include <string>
#include <memory>
+#include <ranges>
#include "utils/gsl.h"
#include "utils/Enum.h"
namespace org::apache::nifi::minifi::sitetosite {
-int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>&
/*transaction*/, RespondCode &code, std::string &message) {
- uint8_t firstByte = 0;
- {
- const auto ret = peer_->read(firstByte);
- if (ret == 0 || io::isError(ret) || firstByte != CODE_SEQUENCE_VALUE_1)
- return -1;
+std::optional<SiteToSiteResponse> SiteToSiteClient::readResponse(const
std::shared_ptr<Transaction>& /*transaction*/) {
+ uint8_t result_byte = 0;
+ if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret)
|| result_byte != CODE_SEQUENCE_VALUE_1) {
+ logger_->log_error("Site2Site read response failed: invalid code sequence
1 value");
+ return std::nullopt;
}
- uint8_t secondByte = 0;
- {
- const auto ret = peer_->read(secondByte);
- if (ret == 0 || io::isError(ret) || secondByte != CODE_SEQUENCE_VALUE_2)
- return -1;
+ if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret)
|| result_byte != CODE_SEQUENCE_VALUE_2) {
+ logger_->log_error("Site2Site read response failed: invalid code sequence
2 value");
+ return std::nullopt;
}
- uint8_t thirdByte = 0;
- {
- const auto ret = peer_->read(thirdByte);
- if (ret == 0 || io::isError(ret))
- return static_cast<int>(ret);
+ if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret))
{
+ logger_->log_error("Site2Site read response failed: failed to read
response code");
+ return std::nullopt;
}
- code = static_cast<RespondCode>(thirdByte);
- RespondCodeContext *resCode = this->getRespondCodeContext(code);
- if (!resCode) {
- return -1;
+ SiteToSiteResponse response;
+ if (auto code = magic_enum::enum_cast<ResponseCode>(result_byte)) {
+ response.code = *code;
+ } else {
+ logger_->log_error("Site2Site read response failed: invalid response
code");
+ return std::nullopt;
}
- if (resCode->hasDescription) {
- const auto ret = peer_->read(message);
- if (ret == 0 || io::isError(ret))
- return -1;
+
+ const ResponseCodeContext* response_code_context =
getResponseCodeContext(response.code);
+ if (!response_code_context) {
+ logger_->log_error("Site2Site read response failed: invalid response code
context");
+ return std::nullopt;
}
- return gsl::narrow<int>(3 + message.size());
+ if (response_code_context->has_description) {
+ if (const auto ret = peer_->read(response.message); ret == 0 ||
io::isError(ret)) {
+ logger_->log_error("Site2Site read response failed: failed to read
response message");
+ return std::nullopt;
+ }
+ }
+ return response;
}
-void SiteToSiteClient::deleteTransaction(const utils::Identifier&
transactionID) {
+void SiteToSiteClient::handleTransactionError(const
std::shared_ptr<Transaction>& transaction, core::ProcessContext& context, const
std::exception& exception) {
+ if (transaction) {
+ deleteTransaction(transaction->getUUID());
+ }
+ context.yield();
+ tearDown();
+ logger_->log_warn("Caught Exception, type: {}, what: {}",
typeid(exception).name(), exception.what());
+}
+
+void SiteToSiteClient::deleteTransaction(const utils::Identifier&
transaction_id) {
std::shared_ptr<Transaction> transaction;
- auto it = this->known_transactions_.find(transactionID);
+ auto it = known_transactions_.find(transaction_id);
if (it == known_transactions_.end()) {
+ logger_->log_warn("Site2Site transaction id '{}' not found for delete",
transaction_id.to_string());
return;
} else {
transaction = it->second;
}
logger_->log_debug("Site2Site delete transaction {}",
transaction->getUUIDStr());
- known_transactions_.erase(transactionID);
+ known_transactions_.erase(transaction_id);
}
-int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>&
/*transaction*/, RespondCode code, const std::string& message) {
- RespondCodeContext *resCode = this->getRespondCodeContext(code);
- if (!resCode) {
- return -1;
+bool SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>&
/*transaction*/, const SiteToSiteResponse& response) {
+ const ResponseCodeContext* response_code_context =
getResponseCodeContext(response.code);
+ if (!response_code_context) {
+ return false;
}
- {
- const std::array<uint8_t, 3> codeSeq = { CODE_SEQUENCE_VALUE_1,
CODE_SEQUENCE_VALUE_2, static_cast<uint8_t>(code) };
- const auto ret = peer_->write(codeSeq.data(), 3);
- if (ret != 3)
- return -1;
+ const std::array<uint8_t, 3> code_sequence = { CODE_SEQUENCE_VALUE_1,
CODE_SEQUENCE_VALUE_2, magic_enum::enum_underlying(response.code) };
+ const auto ret = peer_->write(code_sequence.data(), 3);
+ if (ret != 3) {
+ return false;
}
- if (resCode->hasDescription) {
- const auto ret = peer_->write(message);
- if (io::isError(ret)) return -1;
- if (ret == 0) return 0;
- return 3 + gsl::narrow<int>(ret);
- } else {
- return 3;
+ if (response_code_context->has_description) {
+ return !(io::isError(peer_->write(response.message)));
}
+ return true;
}
bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context,
core::ProcessSession& session) {
auto flow = session.get();
-
- std::shared_ptr<Transaction> transaction = nullptr;
-
if (!flow) {
return false;
}
- if (peer_state_ != READY) {
- if (!bootstrap())
+ if (peer_state_ != PeerState::READY) {
+ if (!bootstrap()) {
return false;
+ }
}
- if (peer_state_ != READY) {
+ if (peer_state_ != PeerState::READY) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with
peer");
}
- // Create the transaction
- transaction = createTransaction(SEND);
+ auto transaction = createTransaction(TransferDirection::SEND);
if (transaction == nullptr) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
- utils::Identifier transactionID = transaction->getUUID();
-
- bool continueTransaction = true;
+ utils::Identifier transaction_id = transaction->getUUID();
std::chrono::high_resolution_clock::time_point transaction_started_at =
std::chrono::high_resolution_clock::now();
try {
- while (continueTransaction) {
+ while (true) {
auto start_time = std::chrono::steady_clock::now();
- std::string payload;
- DataPacket packet(getLogger(), transaction, flow->getAttributes(),
payload);
- int16_t resp = send(transactionID, &packet, flow, &session);
- if (resp == -1) {
+ if (!sendFlowFile(transaction, *flow, session)) {
throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
}
- logger_->log_debug("Site2Site transaction {} send flow record {}",
transactionID.to_string(), flow->getUUIDStr());
- if (resp == 0) {
- auto end_time = std::chrono::steady_clock::now();
- std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
- std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote
Host=" + peer_->getHostName();
- session.getProvenanceReporter()->send(*flow, transitUri, details,
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time),
false);
- }
+ logger_->log_debug("Site2Site transaction {} send flow record {}",
transaction_id.to_string(), flow->getUUIDStr());
+ auto end_time = std::chrono::steady_clock::now();
+ std::string transit_uri = peer_->getURL() + "/" + flow->getUUIDStr();
+ std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host="
+ peer_->getHostName();
+ session.getProvenanceReporter()->send(*flow, transit_uri, details,
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time),
false);
session.remove(flow);
std::chrono::nanoseconds transfer_duration =
std::chrono::high_resolution_clock::now() - transaction_started_at;
- if (transfer_duration > _batchSendNanos)
+ if (transfer_duration > batch_send_nanos_) {
break;
+ }
flow = session.get();
-
if (!flow) {
- continueTransaction = false;
+ break;
}
- } // while true
+ }
- if (!confirm(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " +
transactionID.to_string());
+ if (!confirm(transaction_id)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " +
transaction_id.to_string());
}
- if (!complete(context, transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " +
transactionID.to_string());
+ if (!complete(context, transaction_id)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " +
transaction_id.to_string());
}
- logger_->log_debug("Site2Site transaction {} successfully sent flow record
{}, content bytes {}", transactionID.to_string(),
transaction->total_transfers_, transaction->_bytes);
- } catch (std::exception &exception) {
- if (transaction)
- deleteTransaction(transactionID);
- context.yield();
- tearDown();
- logger_->log_debug("Caught Exception during
SiteToSiteClient::transferFlowFiles, type: {}, what: {}",
typeid(exception).name(), exception.what());
- throw;
- } catch (...) {
- if (transaction)
- deleteTransaction(transactionID);
- context.yield();
- tearDown();
- logger_->log_debug("Caught Exception during
SiteToSiteClient::transferFlowFiles, type: {}", getCurrentExceptionTypeName());
+ logger_->log_debug("Site2Site transaction {} successfully sent flow record
{}, content bytes {}", transaction_id.to_string(),
transaction->getCurrentTransfers(), transaction->getBytes());
+ } catch (const std::exception& exception) {
+ handleTransactionError(transaction, context, exception);
Review Comment:
I think for the `deleteTransaction` it would be good to use the
`gsl::finally`, but in case of failure that should be run before the `tearDown`
so it cannot be used in all cases. Everything else in `handleTransactionError`
should only be used in error cases so I would keep it this way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]