szaszm commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2351674789
##########
libminifi/include/sitetosite/SiteToSiteClient.h:
##########
@@ -19,246 +19,180 @@
#pragma once
#include <algorithm>
-#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <optional>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
-#include "core/Connectable.h"
-#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::sitetosite {
+namespace org::apache::nifi::minifi {
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket {
+namespace test {
+class SiteToSiteClientTestAccessor;
+} // namespace test
+
+namespace sitetosite {
+
+struct DataPacket {
public:
- DataPacket(std::shared_ptr<core::logging::Logger> logger,
std::shared_ptr<Transaction> transaction, std::map<std::string, std::string>
attributes, const std::string &payload)
- : _attributes{std::move(attributes)},
- transaction_{std::move(transaction)},
- payload_{payload},
- logger_reference_{std::move(logger)} {
+ DataPacket(std::shared_ptr<Transaction> transaction, const std::string
&payload)
+ : transaction{std::move(transaction)},
+ payload{payload} {
+ }
+ DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string,
std::string> attributes, const std::string &payload)
+ : attributes{std::move(attributes)},
+ transaction{std::move(transaction)},
+ payload{payload} {
}
- std::map<std::string, std::string> _attributes;
- uint64_t _size{0};
- std::shared_ptr<Transaction> transaction_;
- const std::string & payload_;
- std::shared_ptr<core::logging::Logger> logger_reference_;
+ std::map<std::string, std::string> attributes;
+ std::shared_ptr<Transaction> transaction;
+ const std::string& payload;
};
-class SiteToSiteClient : public core::ConnectableImpl {
+struct SiteToSiteResponse {
+ ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
+ std::string message;
+};
+
+class SiteToSiteClient {
public:
- SiteToSiteClient()
- : core::ConnectableImpl("SitetoSiteClient") {
+ explicit SiteToSiteClient(gsl::not_null<std::unique_ptr<SiteToSitePeer>>
peer)
+ : peer_(std::move(peer)) {
}
- ~SiteToSiteClient() override = default;
+ SiteToSiteClient(const SiteToSiteClient&) = delete;
+ SiteToSiteClient(SiteToSiteClient&&) = delete;
+ SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
+ SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
- void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextServiceInterface>
&context_service) {
- ssl_context_service_ = context_service;
- }
+ virtual ~SiteToSiteClient() = default;
- /**
- * Creates a transaction using the transaction ID and the direction
- * @param transactionID transaction identifier
- * @param direction direction of transfer
- */
- virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
+ virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
+ virtual bool transmitPayload(core::ProcessContext& context, const
std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
- /**
- * Transfers flow files
- * @param direction transfer direction
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transfer(TransferDirection direction, core::ProcessContext&
context, core::ProcessSession& session) {
-#ifndef WIN32
- if (__builtin_expect(direction == SEND, 1)) {
+ bool transfer(TransferDirection direction, core::ProcessContext& context,
core::ProcessSession& session) {
+ if (direction == TransferDirection::SEND) {
return transferFlowFiles(context, session);
} else {
return receiveFlowFiles(context, session);
}
-#else
- if (direction == SEND) {
- return transferFlowFiles(context, session);
- } else {
- return receiveFlowFiles(context, session);
- }
-#endif
}
- /**
- * Transfers flow files to server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transferFlowFiles(core::ProcessContext& context,
core::ProcessSession& session);
-
- /**
- * Receive flow files from server
- * @param context process context
- * @param session process session
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
-
- // Confirm the data that was sent or received by comparing CRC32's of the
data sent and the data received.
- // Receive flow files for the process session
- bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession&
session);
-
- // Receive the data packet from the transaction
- // Return false when any error occurs
- bool receive(const utils::Identifier &transactionID, DataPacket *packet,
bool &eof);
- /**
- * Transfers raw data and attributes to server
- * @param context process context
- * @param session process session
- * @param payload data to transmit
- * @param attributes
- * @returns true if the process succeeded, failure OR exception thrown
otherwise
- */
- virtual bool transmitPayload(core::ProcessContext& context,
core::ProcessSession& session, const std::string &payload,
- std::map<std::string, std::string> attributes)
= 0;
-
- void setPortId(utils::Identifier &id) {
+ void setPortId(const utils::Identifier& id) {
port_id_ = id;
}
- /**
- * Sets the idle timeout.
- */
void setIdleTimeout(std::chrono::milliseconds timeout) {
idle_timeout_ = timeout;
}
- /**
- * Sets the base peer for this interface.
- */
- virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
- peer_ = std::move(peer);
- }
-
- /**
- * Provides a reference to the port identifier
- * @returns port identifier
- */
- utils::Identifier getPortId() const {
+ [[nodiscard]] utils::Identifier getPortId() const {
return port_id_;
}
- /**
- * Obtains the peer list and places them into the provided vector
- * @param peers peer vector.
- * @return true if successful, false otherwise
- */
- virtual bool getPeerList(std::vector<PeerStatus> &peers) = 0;
-
- /**
- * Establishes the interface.
- * @return true if successful, false otherwise
- */
- virtual bool establish() = 0;
-
- const std::shared_ptr<core::logging::Logger> &getLogger() {
+ [[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
return logger_;
}
- void yield() override {
+ void setSSLContextService(const
std::shared_ptr<minifi::controllers::SSLContextServiceInterface>
&context_service) {
+ ssl_context_service_ = context_service;
}
- /**
- * Determines if we are connected and operating
- */
- bool isRunning() const override {
- return running_;
+ void setUseCompression(bool use_compression) {
+ use_compression_ = use_compression;
}
- /**
- * Determines if work is available by this connectable
- * @return boolean if work is available.
- */
- bool isWorkAvailable() override {
- return true;
+ void setBatchSize(uint64_t size) {
+ batch_size_ = size;
}
- virtual bool bootstrap() {
- return true;
+ void setBatchCount(uint64_t count) {
+ batch_count_ = count;
}
- // Return -1 when any error occurs
- virtual int16_t send(const utils::Identifier& transactionID, DataPacket*
packet, const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession*
session);
+ void setBatchDuration(std::chrono::milliseconds duration) {
+ batch_duration_ = duration;
+ }
- protected:
- // Cancel the transaction
- virtual void cancel(const utils::Identifier &transactionID);
- // Complete the transaction
- virtual bool complete(core::ProcessContext& context, const utils::Identifier
&transactionID);
- // Error the transaction
- virtual void error(const utils::Identifier &transactionID);
+ virtual void setTimeout(std::chrono::milliseconds timeout) {
+ timeout_ = timeout;
+ }
- virtual bool confirm(const utils::Identifier &transactionID);
- // deleteTransaction
- virtual void deleteTransaction(const utils::Identifier &transactionID);
+ protected:
+ friend class test::SiteToSiteClientTestAccessor;
+ virtual bool bootstrap() = 0;
+ virtual bool establish() = 0;
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection
direction) = 0;
virtual void tearDown() = 0;
- // read Respond
- virtual int readResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode &code, std::string &message);
- // write respond
- virtual int writeResponse(const std::shared_ptr<Transaction> &transaction,
RespondCode code, const std::string& message);
- // getRespondCodeContext
- virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (auto & i : SiteToSiteRequest::respondCodeContext) {
- if (i.code == code) {
- return &i;
- }
- }
- return nullptr;
- }
+ virtual void deleteTransaction(const utils::Identifier &transaction_id);
+ virtual std::optional<SiteToSiteResponse> readResponse(const
std::shared_ptr<Transaction> &transaction);
+ virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction,
const SiteToSiteResponse& response);
- // Peer State
- PeerState peer_state_{PeerState::IDLE};
+ bool initializeSend(const std::shared_ptr<Transaction>& transaction);
+ bool writeAttributesInSendTransaction(const std::shared_ptr<Transaction>&
transaction, const std::map<std::string, std::string>& attributes);
+ void finalizeSendTransaction(const std::shared_ptr<Transaction>&
transaction, uint64_t sent_bytes);
+ bool sendPacket(const DataPacket& packet);
+ bool sendFlowFile(const std::shared_ptr<Transaction>& transaction,
core::FlowFile& flow_file, core::ProcessSession& session);
- // portId
- utils::Identifier port_id_;
+ void cancel(const utils::Identifier &transaction_id);
+ bool complete(core::ProcessContext& context, const utils::Identifier
&transaction_id);
+ void error(const utils::Identifier &transaction_id);
+ bool confirm(const utils::Identifier &transaction_id);
- // idleTimeout
- std::chrono::milliseconds idle_timeout_{15000};
+ void handleTransactionError(const std::shared_ptr<Transaction>& transaction,
core::ProcessContext& context, const std::exception& exception);
- // Peer Connection
- std::unique_ptr<SiteToSitePeer> peer_;
-
- std::atomic<bool> running_{false};
-
- // transaction map
+ PeerState peer_state_{PeerState::IDLE};
+ utils::Identifier port_id_;
+ std::chrono::milliseconds idle_timeout_{15s};
+ gsl::not_null<std::unique_ptr<SiteToSitePeer>> peer_;
std::map<utils::Identifier, std::shared_ptr<Transaction>>
known_transactions_;
+ std::chrono::nanoseconds batch_send_nanos_{5s};
- // BATCH_SEND_NANOS
- std::chrono::nanoseconds _batchSendNanos = std::chrono::seconds(5);
-
- /***
- * versioning
- */
- uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
- int _currentVersionIndex{0};
- uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
- uint32_t _supportedCodecVersion[1] = {1};
- int _currentCodecVersionIndex{0};
- uint32_t
_currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
+ const std::vector<uint32_t> supported_versions_ = {5, 4, 3, 2, 1};
+ uint32_t current_version_index_{0};
+ uint32_t current_version_{supported_versions_[current_version_index_]};
+ const std::vector<uint32_t> supported_codec_versions_ = {1};
+ uint32_t current_codec_version_index_{0};
+ uint32_t
current_codec_version_{supported_codec_versions_[current_codec_version_index_]};
Review Comment:
Why is it ok to have these variables without synchronizatioen, while the
rest of them are atomic below?
##########
libminifi/src/RemoteProcessGroupPort.cpp:
##########
@@ -0,0 +1,369 @@
+/**
+ * @file RemoteProcessGroupPort.cpp
+ * RemoteProcessGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RemoteProcessGroupPort.h"
+
+#include <cinttypes>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "Exception.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessContext.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "http/BaseHTTPClient.h"
+#include "rapidjson/document.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/SiteToSiteFactory.h"
+#include "utils/net/DNS.h"
+
+#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW,
which conflicts with rapidjson
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi {
+
+namespace {
+std::string buildFullSiteToSiteUrl(const RPG& nifi) {
+ std::stringstream full_url;
+ full_url << nifi.protocol << nifi.host;
+ // don't append port if it is 0 ( undefined )
+ if (nifi.port > 0) {
+ full_url << ":" << std::to_string(nifi.port);
+ }
+ full_url << "/nifi-api/site-to-site";
+ return full_url.str();
+}
+} // namespace
+
+const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME =
"RemoteProcessGroupPortSSLContextService";
+
+void RemoteProcessGroupPort::setURL(const std::string& val) {
+ auto urls = utils::string::split(val, ",");
+ for (const auto& url : urls) {
+ http::URL parsed_url{utils::string::trim(url)};
+ if (parsed_url.isValid()) {
+ logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url,
parsed_url.hostPort());
+ nifi_instances_.push_back({parsed_url.host(), parsed_url.port(),
parsed_url.protocol()});
+ } else {
+ logger_->log_error("Could not parse RPG URL '{}'", url);
+ }
+ }
+}
+
+gsl::not_null<std::unique_ptr<sitetosite::SiteToSiteClient>>
RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration&
config) const {
+ config.setSecurityContext(ssl_service_);
+ config.setHTTPProxy(proxy_);
+ config.setIdleTimeout(idle_timeout_);
+ config.setUseCompression(use_compression_);
+ config.setBatchCount(batch_count_);
+ config.setBatchSize(batch_size_);
+ config.setBatchDuration(batch_duration_);
+ config.setTimeout(timeout_);
+
+ return sitetosite::createClient(config);
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient>
RemoteProcessGroupPort::getNextProtocol() {
+ std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+ if (!available_protocols_.try_dequeue(next_protocol)) {
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ if (peer_index_ >= 0) {
+ logger_->log_debug("Creating client from peer {}", peer_index_);
+ auto& peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ next_protocol = initializeProtocol(config);
+ } else {
+ logger_->log_debug("Refreshing the peer list since there are none
configured.");
+ refreshPeerList();
+ }
+ }
+ logger_->log_debug("Obtained protocol from available_protocols_");
+ return next_protocol;
+}
+
+void RemoteProcessGroupPort::returnProtocol(core::ProcessContext& context,
std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) {
+ auto count =
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+ if (available_protocols_.size_approx() >= count) {
+ logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
+ // let the memory be freed
+ return;
+ }
+ logger_->log_debug("enqueueing protocol {}, have a total of {}",
getUUIDStr(), available_protocols_.size_approx());
+ available_protocols_.enqueue(std::move(return_protocol));
+}
+
+void RemoteProcessGroupPort::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+
+ logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+ if (auto protocol_uuid = context.getProperty(portUUID)) {
+ protocol_uuid_ = *protocol_uuid;
+ }
+
+ auto context_name = context.getProperty(SSLContext);
+ if (!context_name || IsNullOrEmpty(*context_name)) {
+ context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+ }
+
+ std::shared_ptr<core::controller::ControllerService> service =
context.getControllerService(*context_name, getUUID());
+ if (nullptr != service) {
+ ssl_service_ =
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(service);
+ } else {
+ std::string secureStr;
+ if (configure_->get(Configure::nifi_remote_input_secure, secureStr) &&
utils::string::toBool(secureStr).value_or(false)) {
+ ssl_service_ =
std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME,
configure_);
+ ssl_service_->onEnable();
+ }
+ }
+
+ idle_timeout_ = context.getProperty(idleTimeout) |
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) |
utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
+
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ if (!nifi_instances_.empty()) {
+ refreshPeerList();
+ if (!peers_.empty())
+ peer_index_ = 0;
+ }
+ // populate the site2site protocol for load balancing between them
+ if (!peers_.empty()) {
+ auto count =
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+ for (uint32_t i = 0; i < count; i++) {
+ auto peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ logger_->log_trace("Creating client");
+ auto next_protocol = initializeProtocol(config);
+ logger_->log_trace("Created client, moving into available protocols");
+ returnProtocol(context, std::move(next_protocol));
+ }
+ } else {
+ // we don't have any peers
+ logger_->log_error("No peers selected during scheduling");
+ }
+}
+
+void RemoteProcessGroupPort::notifyStop() {
+ transmitting_ = false;
+ std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+ while (available_protocols_.try_dequeue(next_protocol)) {
+ // clear all protocols now
+ }
+}
+
+void RemoteProcessGroupPort::onTrigger(core::ProcessContext& context,
core::ProcessSession& session) {
+ logger_->log_trace("On trigger {}", getUUIDStr());
+ if (!transmitting_) {
+ return;
+ }
+
+ try {
+ logger_->log_trace("get protocol in on trigger");
+ auto protocol = getNextProtocol();
+
+ if (!protocol) {
+ logger_->log_info("no protocol, yielding");
+ context.yield();
+ return;
+ }
+
+ if (!protocol->transfer(direction_, context, session)) {
+ logger_->log_warn("protocol transmission failed, yielding");
+ context.yield();
+ }
+
+ returnProtocol(context, std::move(protocol));
+ } catch (const std::exception&) {
+ context.yield();
+ session.rollback();
+ }
+}
+
+std::optional<std::string> RemoteProcessGroupPort::getRestApiToken(const RPG&
nifi) const {
+ std::string rest_user_name;
+ configure_->get(Configure::nifi_rest_api_user_name, rest_user_name);
+ if (rest_user_name.empty()) {
+ return std::nullopt;
+ }
+
+ std::string rest_password;
+ configure_->get(Configure::nifi_rest_api_password, rest_password);
+
+ std::stringstream login_url;
+ login_url << nifi.protocol << nifi.host;
+ // don't append port if it is 0 ( undefined )
+ if (nifi.port > 0) {
+ login_url << ":" << std::to_string(nifi.port);
+ }
+ login_url << "/nifi-api/access/token";
+
+ auto client_ptr =
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient",
"HTTPClient");
+ if (nullptr == client_ptr) {
+ logger_->log_error("Could not locate HTTPClient. You do not have cURL
support!");
+ return std::nullopt;
+ }
+ auto client =
std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
+ client->initialize(http::HttpRequestMethod::GET, login_url.str(),
ssl_service_);
+ // use a connection timeout. if this times out we will simply attempt
re-connection
+ // so no need for configuration parameter that isn't already defined in
Processor
+ client->setConnectionTimeout(10s);
+ client->setReadTimeout(idle_timeout_);
+
+ auto token = http::get_token(client.get(), rest_user_name, rest_password);
+ logger_->log_debug("Token from NiFi REST Api endpoint {}, {}",
login_url.str(), token);
Review Comment:
We should probably not log the token because it's sensitive info
##########
libminifi/src/RemoteProcessGroupPort.cpp:
##########
@@ -0,0 +1,369 @@
+/**
+ * @file RemoteProcessGroupPort.cpp
+ * RemoteProcessGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RemoteProcessGroupPort.h"
+
+#include <cinttypes>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "Exception.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessContext.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "http/BaseHTTPClient.h"
+#include "rapidjson/document.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/SiteToSiteFactory.h"
+#include "utils/net/DNS.h"
+
+#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW,
which conflicts with rapidjson
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi {
+
+namespace {
+std::string buildFullSiteToSiteUrl(const RPG& nifi) {
+ std::stringstream full_url;
+ full_url << nifi.protocol << nifi.host;
+ // don't append port if it is 0 ( undefined )
+ if (nifi.port > 0) {
+ full_url << ":" << std::to_string(nifi.port);
+ }
+ full_url << "/nifi-api/site-to-site";
+ return full_url.str();
+}
+} // namespace
+
+const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME =
"RemoteProcessGroupPortSSLContextService";
+
+void RemoteProcessGroupPort::setURL(const std::string& val) {
+ auto urls = utils::string::split(val, ",");
+ for (const auto& url : urls) {
+ http::URL parsed_url{utils::string::trim(url)};
+ if (parsed_url.isValid()) {
+ logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url,
parsed_url.hostPort());
+ nifi_instances_.push_back({parsed_url.host(), parsed_url.port(),
parsed_url.protocol()});
+ } else {
+ logger_->log_error("Could not parse RPG URL '{}'", url);
+ }
+ }
+}
+
+gsl::not_null<std::unique_ptr<sitetosite::SiteToSiteClient>>
RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration&
config) const {
+ config.setSecurityContext(ssl_service_);
+ config.setHTTPProxy(proxy_);
+ config.setIdleTimeout(idle_timeout_);
+ config.setUseCompression(use_compression_);
+ config.setBatchCount(batch_count_);
+ config.setBatchSize(batch_size_);
+ config.setBatchDuration(batch_duration_);
+ config.setTimeout(timeout_);
+
+ return sitetosite::createClient(config);
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient>
RemoteProcessGroupPort::getNextProtocol() {
+ std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+ if (!available_protocols_.try_dequeue(next_protocol)) {
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ if (peer_index_ >= 0) {
+ logger_->log_debug("Creating client from peer {}", peer_index_);
+ auto& peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ next_protocol = initializeProtocol(config);
+ } else {
+ logger_->log_debug("Refreshing the peer list since there are none
configured.");
+ refreshPeerList();
+ }
+ }
+ logger_->log_debug("Obtained protocol from available_protocols_");
+ return next_protocol;
+}
+
+void RemoteProcessGroupPort::returnProtocol(core::ProcessContext& context,
std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) {
+ auto count =
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+ if (available_protocols_.size_approx() >= count) {
+ logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
+ // let the memory be freed
+ return;
+ }
+ logger_->log_debug("enqueueing protocol {}, have a total of {}",
getUUIDStr(), available_protocols_.size_approx());
+ available_protocols_.enqueue(std::move(return_protocol));
+}
+
+void RemoteProcessGroupPort::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+
+ logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+ if (auto protocol_uuid = context.getProperty(portUUID)) {
+ protocol_uuid_ = *protocol_uuid;
+ }
+
+ auto context_name = context.getProperty(SSLContext);
+ if (!context_name || IsNullOrEmpty(*context_name)) {
+ context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+ }
+
+ std::shared_ptr<core::controller::ControllerService> service =
context.getControllerService(*context_name, getUUID());
+ if (nullptr != service) {
+ ssl_service_ =
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(service);
+ } else {
+ std::string secureStr;
+ if (configure_->get(Configure::nifi_remote_input_secure, secureStr) &&
utils::string::toBool(secureStr).value_or(false)) {
+ ssl_service_ =
std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME,
configure_);
+ ssl_service_->onEnable();
+ }
+ }
+
+ idle_timeout_ = context.getProperty(idleTimeout) |
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) |
utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
+
+ std::lock_guard<std::mutex> lock(peer_mutex_);
+ if (!nifi_instances_.empty()) {
+ refreshPeerList();
+ if (!peers_.empty())
+ peer_index_ = 0;
+ }
+ // populate the site2site protocol for load balancing between them
+ if (!peers_.empty()) {
+ auto count =
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+ for (uint32_t i = 0; i < count; i++) {
+ auto peer_status = peers_[peer_index_];
+ sitetosite::SiteToSiteClientConfiguration
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(),
local_network_interface_, client_type_);
+ peer_index_++;
+ if (peer_index_ >= static_cast<int>(peers_.size())) {
+ peer_index_ = 0;
+ }
+ logger_->log_trace("Creating client");
+ auto next_protocol = initializeProtocol(config);
+ logger_->log_trace("Created client, moving into available protocols");
+ returnProtocol(context, std::move(next_protocol));
+ }
+ } else {
+ // we don't have any peers
+ logger_->log_error("No peers selected during scheduling");
+ }
+}
+
+void RemoteProcessGroupPort::notifyStop() {
+ transmitting_ = false;
+ std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+ while (available_protocols_.try_dequeue(next_protocol)) {
+ // clear all protocols now
+ }
+}
+
+void RemoteProcessGroupPort::onTrigger(core::ProcessContext& context,
core::ProcessSession& session) {
+ logger_->log_trace("On trigger {}", getUUIDStr());
+ if (!transmitting_) {
+ return;
+ }
+
+ try {
+ logger_->log_trace("get protocol in on trigger");
+ auto protocol = getNextProtocol();
+
+ if (!protocol) {
+ logger_->log_info("no protocol, yielding");
+ context.yield();
+ return;
+ }
+
+ if (!protocol->transfer(direction_, context, session)) {
+ logger_->log_warn("protocol transmission failed, yielding");
+ context.yield();
+ }
+
+ returnProtocol(context, std::move(protocol));
+ } catch (const std::exception&) {
+ context.yield();
+ session.rollback();
+ }
+}
+
+std::optional<std::string> RemoteProcessGroupPort::getRestApiToken(const RPG&
nifi) const {
+ std::string rest_user_name;
+ configure_->get(Configure::nifi_rest_api_user_name, rest_user_name);
+ if (rest_user_name.empty()) {
+ return std::nullopt;
+ }
+
+ std::string rest_password;
+ configure_->get(Configure::nifi_rest_api_password, rest_password);
+
+ std::stringstream login_url;
+ login_url << nifi.protocol << nifi.host;
+ // don't append port if it is 0 ( undefined )
+ if (nifi.port > 0) {
+ login_url << ":" << std::to_string(nifi.port);
+ }
+ login_url << "/nifi-api/access/token";
+
+ auto client_ptr =
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient",
"HTTPClient");
+ if (nullptr == client_ptr) {
+ logger_->log_error("Could not locate HTTPClient. You do not have cURL
support!");
+ return std::nullopt;
+ }
+ auto client =
std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
+ client->initialize(http::HttpRequestMethod::GET, login_url.str(),
ssl_service_);
+ // use a connection timeout. if this times out we will simply attempt
re-connection
+ // so no need for configuration parameter that isn't already defined in
Processor
+ client->setConnectionTimeout(10s);
+ client->setReadTimeout(idle_timeout_);
+
+ auto token = http::get_token(client.get(), rest_user_name, rest_password);
+ logger_->log_debug("Token from NiFi REST Api endpoint {}, {}",
login_url.str(), token);
+ return token;
+}
+
+std::optional<std::pair<std::string, uint16_t>>
RemoteProcessGroupPort::parseSiteToSiteDataFromControllerConfig(const RPG&
nifi, const std::string& controller) const {
+ rapidjson::Document doc;
+ rapidjson::ParseResult ok = doc.Parse(controller.c_str());
+
+ if (!ok || !doc.IsObject() || doc.ObjectEmpty()) {
+ return std::nullopt;
+ }
+
+ rapidjson::Value::MemberIterator itr = doc.FindMember("controller");
+
+ if (itr == doc.MemberEnd() || !itr->value.IsObject()) {
+ return std::nullopt;
+ }
+
+ rapidjson::Value controllerValue = itr->value.GetObject();
+ rapidjson::Value::ConstMemberIterator end_itr = controllerValue.MemberEnd();
+ rapidjson::Value::ConstMemberIterator port_itr =
controllerValue.FindMember("remoteSiteListeningPort");
+ rapidjson::Value::ConstMemberIterator secure_itr =
controllerValue.FindMember("siteToSiteSecure");
+ bool site_to_site_secure = secure_itr != end_itr &&
secure_itr->value.IsBool() ? secure_itr->value.GetBool() : false;
+
+ uint16_t site_to_site_port = 0;
+ if (client_type_ == sitetosite::ClientType::RAW && port_itr != end_itr &&
port_itr->value.IsNumber()) {
+ site_to_site_port = port_itr->value.GetInt();
+ } else {
+ site_to_site_port = nifi.port;
+ }
+
+ logger_->log_debug("process group remote site2site port {}, is secure {}",
site_to_site_port, site_to_site_secure);
+ return std::make_pair(nifi.host, site_to_site_port);
+}
+
+std::optional<std::pair<std::string, uint16_t>>
RemoteProcessGroupPort::tryRefreshSiteToSiteInstance(RPG nifi) const { //
NOLINT(performance-unnecessary-value-param)
+#ifdef WIN32
+ if ("localhost" == nifi.host) {
+ nifi.host = org::apache::nifi::minifi::utils::net::getMyHostName();
+ }
+#endif
+ auto token = getRestApiToken(nifi);
+ if (token && token->empty()) {
+ return std::nullopt;
+ }
+
+ auto client_ptr =
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient",
"HTTPClient");
+ if (nullptr == client_ptr) {
+ logger_->log_error("Could not locate HTTPClient. You do not have cURL
support, defaulting to base configuration!");
+ return std::nullopt;
+ }
+
+ auto client =
std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
+ auto full_url = buildFullSiteToSiteUrl(nifi);
+ client->initialize(http::HttpRequestMethod::GET, full_url, ssl_service_);
+ // use a connection timeout. if this times out we will simply attempt
re-connection
+ // so no need for configuration parameter that isn't already defined in
Processor
+ client->setConnectionTimeout(10s);
+ client->setReadTimeout(idle_timeout_);
+ if (!proxy_.host.empty()) {
+ client->setHTTPProxy(proxy_);
+ }
+ if (token) {
+ client->setRequestHeader("Authorization", token);
+ }
+
+ client->setVerbose(false);
+
+ if (!client->submit() || client->getResponseCode() != 200) {
+ logger_->log_error("ProcessGroup::refreshRemoteSiteToSiteInfo --
curl_easy_perform() failed , response code {}\n", client->getResponseCode());
+ return std::nullopt;
+ }
+
+ const std::vector<char> &response_body = client->getResponseBody();
+ if (response_body.empty()) {
+ logger_->log_error("Cannot output body to content for
ProcessGroup::refreshRemoteSiteToSiteInfo: received HTTP code {} from {}",
client->getResponseCode(), full_url);
+ return std::nullopt;
+ }
+
+ std::string controller = std::string(response_body.begin(),
response_body.end());
+ logger_->log_trace("controller config {}", controller);
+ return parseSiteToSiteDataFromControllerConfig(nifi, controller);
+}
+
+std::optional<std::pair<std::string, uint16_t>>
RemoteProcessGroupPort::refreshRemoteSiteToSiteInfo() {
+ if (nifi_instances_.empty()) {
+ return std::nullopt;
+ }
+
+ for (const auto& nifi : nifi_instances_) {
+ auto result = tryRefreshSiteToSiteInstance(nifi);
+ if (result) {
+ return result;
+ }
+ }
+ return std::nullopt;
+}
+
+void RemoteProcessGroupPort::refreshPeerList() {
+ auto connection = refreshRemoteSiteToSiteInfo();
+ if (!connection) {
+ logger_->log_debug("No port configured");
Review Comment:
Shouldn't this be a warning or error?
##########
libminifi/src/sitetosite/SiteToSiteClient.cpp:
##########
@@ -20,735 +20,740 @@
#include <map>
#include <string>
#include <memory>
+#include <ranges>
#include "utils/gsl.h"
#include "utils/Enum.h"
namespace org::apache::nifi::minifi::sitetosite {
-int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>&
/*transaction*/, RespondCode &code, std::string &message) {
- uint8_t firstByte = 0;
- {
- const auto ret = peer_->read(firstByte);
- if (ret == 0 || io::isError(ret) || firstByte != CODE_SEQUENCE_VALUE_1)
- return -1;
+std::optional<SiteToSiteResponse> SiteToSiteClient::readResponse(const
std::shared_ptr<Transaction>& /*transaction*/) {
+ uint8_t result_byte = 0;
+ if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret)
|| result_byte != CODE_SEQUENCE_VALUE_1) {
+ logger_->log_error("Site2Site read response failed: invalid code sequence
1 value");
+ return std::nullopt;
}
- uint8_t secondByte = 0;
- {
- const auto ret = peer_->read(secondByte);
- if (ret == 0 || io::isError(ret) || secondByte != CODE_SEQUENCE_VALUE_2)
- return -1;
+ if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret)
|| result_byte != CODE_SEQUENCE_VALUE_2) {
+ logger_->log_error("Site2Site read response failed: invalid code sequence
2 value");
+ return std::nullopt;
}
- uint8_t thirdByte = 0;
- {
- const auto ret = peer_->read(thirdByte);
- if (ret == 0 || io::isError(ret))
- return static_cast<int>(ret);
+ if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret))
{
+ logger_->log_error("Site2Site read response failed: failed to read
response code");
+ return std::nullopt;
}
- code = static_cast<RespondCode>(thirdByte);
- RespondCodeContext *resCode = this->getRespondCodeContext(code);
- if (!resCode) {
- return -1;
+ SiteToSiteResponse response;
+ if (auto code = magic_enum::enum_cast<ResponseCode>(result_byte)) {
+ response.code = *code;
+ } else {
+ logger_->log_error("Site2Site read response failed: invalid response
code");
+ return std::nullopt;
}
- if (resCode->hasDescription) {
- const auto ret = peer_->read(message);
- if (ret == 0 || io::isError(ret))
- return -1;
+
+ const ResponseCodeContext* response_code_context =
getResponseCodeContext(response.code);
+ if (!response_code_context) {
+ logger_->log_error("Site2Site read response failed: invalid response code
context");
+ return std::nullopt;
}
- return gsl::narrow<int>(3 + message.size());
+ if (response_code_context->has_description) {
+ if (const auto ret = peer_->read(response.message); ret == 0 ||
io::isError(ret)) {
+ logger_->log_error("Site2Site read response failed: failed to read
response message");
+ return std::nullopt;
+ }
+ }
+ return response;
}
-void SiteToSiteClient::deleteTransaction(const utils::Identifier&
transactionID) {
+void SiteToSiteClient::handleTransactionError(const
std::shared_ptr<Transaction>& transaction, core::ProcessContext& context, const
std::exception& exception) {
+ if (transaction) {
+ deleteTransaction(transaction->getUUID());
+ }
+ context.yield();
+ tearDown();
+ logger_->log_warn("Caught Exception, type: {}, what: {}",
typeid(exception).name(), exception.what());
+}
+
+void SiteToSiteClient::deleteTransaction(const utils::Identifier&
transaction_id) {
std::shared_ptr<Transaction> transaction;
- auto it = this->known_transactions_.find(transactionID);
+ auto it = known_transactions_.find(transaction_id);
if (it == known_transactions_.end()) {
+ logger_->log_warn("Site2Site transaction id '{}' not found for delete",
transaction_id.to_string());
return;
} else {
transaction = it->second;
}
logger_->log_debug("Site2Site delete transaction {}",
transaction->getUUIDStr());
- known_transactions_.erase(transactionID);
+ known_transactions_.erase(transaction_id);
}
-int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>&
/*transaction*/, RespondCode code, const std::string& message) {
- RespondCodeContext *resCode = this->getRespondCodeContext(code);
- if (!resCode) {
- return -1;
+bool SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>&
/*transaction*/, const SiteToSiteResponse& response) {
+ const ResponseCodeContext* response_code_context =
getResponseCodeContext(response.code);
+ if (!response_code_context) {
+ return false;
}
- {
- const std::array<uint8_t, 3> codeSeq = { CODE_SEQUENCE_VALUE_1,
CODE_SEQUENCE_VALUE_2, static_cast<uint8_t>(code) };
- const auto ret = peer_->write(codeSeq.data(), 3);
- if (ret != 3)
- return -1;
+ const std::array<uint8_t, 3> code_sequence = { CODE_SEQUENCE_VALUE_1,
CODE_SEQUENCE_VALUE_2, magic_enum::enum_underlying(response.code) };
+ const auto ret = peer_->write(code_sequence.data(), 3);
+ if (ret != 3) {
+ return false;
}
- if (resCode->hasDescription) {
- const auto ret = peer_->write(message);
- if (io::isError(ret)) return -1;
- if (ret == 0) return 0;
- return 3 + gsl::narrow<int>(ret);
- } else {
- return 3;
+ if (response_code_context->has_description) {
+ return !(io::isError(peer_->write(response.message)));
}
+ return true;
}
bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context,
core::ProcessSession& session) {
auto flow = session.get();
-
- std::shared_ptr<Transaction> transaction = nullptr;
-
if (!flow) {
return false;
}
- if (peer_state_ != READY) {
- if (!bootstrap())
+ if (peer_state_ != PeerState::READY) {
+ if (!bootstrap()) {
return false;
+ }
}
- if (peer_state_ != READY) {
+ if (peer_state_ != PeerState::READY) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with
peer");
}
- // Create the transaction
- transaction = createTransaction(SEND);
+ auto transaction = createTransaction(TransferDirection::SEND);
if (transaction == nullptr) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
- utils::Identifier transactionID = transaction->getUUID();
-
- bool continueTransaction = true;
+ utils::Identifier transaction_id = transaction->getUUID();
std::chrono::high_resolution_clock::time_point transaction_started_at =
std::chrono::high_resolution_clock::now();
try {
- while (continueTransaction) {
+ while (true) {
auto start_time = std::chrono::steady_clock::now();
- std::string payload;
- DataPacket packet(getLogger(), transaction, flow->getAttributes(),
payload);
- int16_t resp = send(transactionID, &packet, flow, &session);
- if (resp == -1) {
+ if (!sendFlowFile(transaction, *flow, session)) {
throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
}
- logger_->log_debug("Site2Site transaction {} send flow record {}",
transactionID.to_string(), flow->getUUIDStr());
- if (resp == 0) {
- auto end_time = std::chrono::steady_clock::now();
- std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
- std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote
Host=" + peer_->getHostName();
- session.getProvenanceReporter()->send(*flow, transitUri, details,
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time),
false);
- }
+ logger_->log_debug("Site2Site transaction {} send flow record {}",
transaction_id.to_string(), flow->getUUIDStr());
+ auto end_time = std::chrono::steady_clock::now();
+ std::string transit_uri = peer_->getURL() + "/" + flow->getUUIDStr();
+ std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host="
+ peer_->getHostName();
+ session.getProvenanceReporter()->send(*flow, transit_uri, details,
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time),
false);
session.remove(flow);
std::chrono::nanoseconds transfer_duration =
std::chrono::high_resolution_clock::now() - transaction_started_at;
- if (transfer_duration > _batchSendNanos)
+ if (transfer_duration > batch_send_nanos_) {
break;
+ }
flow = session.get();
-
if (!flow) {
- continueTransaction = false;
+ break;
}
- } // while true
+ }
- if (!confirm(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " +
transactionID.to_string());
+ if (!confirm(transaction_id)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " +
transaction_id.to_string());
}
- if (!complete(context, transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " +
transactionID.to_string());
+ if (!complete(context, transaction_id)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " +
transaction_id.to_string());
}
- logger_->log_debug("Site2Site transaction {} successfully sent flow record
{}, content bytes {}", transactionID.to_string(),
transaction->total_transfers_, transaction->_bytes);
- } catch (std::exception &exception) {
- if (transaction)
- deleteTransaction(transactionID);
- context.yield();
- tearDown();
- logger_->log_debug("Caught Exception during
SiteToSiteClient::transferFlowFiles, type: {}, what: {}",
typeid(exception).name(), exception.what());
- throw;
- } catch (...) {
- if (transaction)
- deleteTransaction(transactionID);
- context.yield();
- tearDown();
- logger_->log_debug("Caught Exception during
SiteToSiteClient::transferFlowFiles, type: {}", getCurrentExceptionTypeName());
+ logger_->log_debug("Site2Site transaction {} successfully sent flow record
{}, content bytes {}", transaction_id.to_string(),
transaction->getCurrentTransfers(), transaction->getBytes());
+ } catch (const std::exception& exception) {
+ handleTransactionError(transaction, context, exception);
Review Comment:
I'd prefer if this was done with RAII or gsl::finally
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]