lordgamez commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2387240332


##########
libminifi/src/RemoteProcessGroupPort.cpp:
##########
@@ -0,0 +1,369 @@
+/**
+ * @file RemoteProcessGroupPort.cpp
+ * RemoteProcessGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RemoteProcessGroupPort.h"
+
+#include <cinttypes>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "Exception.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessContext.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "http/BaseHTTPClient.h"
+#include "rapidjson/document.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/SiteToSiteFactory.h"
+#include "utils/net/DNS.h"
+
+#undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, 
which conflicts with rapidjson
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi {
+
+namespace {
+std::string buildFullSiteToSiteUrl(const RPG& nifi) {
+  std::stringstream full_url;
+  full_url << nifi.protocol << nifi.host;
+  // don't append port if it is 0 ( undefined )
+  if (nifi.port > 0) {
+    full_url << ":" << std::to_string(nifi.port);
+  }
+  full_url << "/nifi-api/site-to-site";
+  return full_url.str();
+}
+}  // namespace
+
+const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME = 
"RemoteProcessGroupPortSSLContextService";
+
+void RemoteProcessGroupPort::setURL(const std::string& val) {
+  auto urls = utils::string::split(val, ",");
+  for (const auto& url : urls) {
+    http::URL parsed_url{utils::string::trim(url)};
+    if (parsed_url.isValid()) {
+      logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url, 
parsed_url.hostPort());
+      nifi_instances_.push_back({parsed_url.host(), parsed_url.port(), 
parsed_url.protocol()});
+    } else {
+      logger_->log_error("Could not parse RPG URL '{}'", url);
+    }
+  }
+}
+
+gsl::not_null<std::unique_ptr<sitetosite::SiteToSiteClient>> 
RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration&
 config) const {
+  config.setSecurityContext(ssl_service_);
+  config.setHTTPProxy(proxy_);
+  config.setIdleTimeout(idle_timeout_);
+  config.setUseCompression(use_compression_);
+  config.setBatchCount(batch_count_);
+  config.setBatchSize(batch_size_);
+  config.setBatchDuration(batch_duration_);
+  config.setTimeout(timeout_);
+
+  return sitetosite::createClient(config);
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient> 
RemoteProcessGroupPort::getNextProtocol() {
+  std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+  if (!available_protocols_.try_dequeue(next_protocol)) {
+    std::lock_guard<std::mutex> lock(peer_mutex_);
+    if (peer_index_ >= 0) {
+      logger_->log_debug("Creating client from peer {}", peer_index_);
+      auto& peer_status = peers_[peer_index_];
+      sitetosite::SiteToSiteClientConfiguration 
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), 
local_network_interface_, client_type_);
+      peer_index_++;
+      if (peer_index_ >= static_cast<int>(peers_.size())) {
+        peer_index_ = 0;
+      }
+      next_protocol = initializeProtocol(config);
+    } else {
+      logger_->log_debug("Refreshing the peer list since there are none 
configured.");
+      refreshPeerList();
+    }
+  }
+  logger_->log_debug("Obtained protocol from available_protocols_");
+  return next_protocol;
+}
+
+void RemoteProcessGroupPort::returnProtocol(core::ProcessContext& context, 
std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) {
+  auto count = 
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+  if (available_protocols_.size_approx() >= count) {
+    logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
+    // let the memory be freed
+    return;
+  }
+  logger_->log_debug("enqueueing protocol {}, have a total of {}", 
getUUIDStr(), available_protocols_.size_approx());
+  available_protocols_.enqueue(std::move(return_protocol));
+}
+
+void RemoteProcessGroupPort::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+
+  logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  if (auto protocol_uuid = context.getProperty(portUUID)) {
+    protocol_uuid_ = *protocol_uuid;
+  }
+
+  auto context_name = context.getProperty(SSLContext);
+  if (!context_name || IsNullOrEmpty(*context_name)) {
+    context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+  }
+
+  std::shared_ptr<core::controller::ControllerService> service = 
context.getControllerService(*context_name, getUUID());
+  if (nullptr != service) {
+    ssl_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(service);
+  } else {
+    std::string secureStr;
+    if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && 
utils::string::toBool(secureStr).value_or(false)) {
+      ssl_service_ = 
std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME,
 configure_);
+      ssl_service_->onEnable();
+    }
+  }
+
+  idle_timeout_ = context.getProperty(idleTimeout) | 
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | 
utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
+
+  std::lock_guard<std::mutex> lock(peer_mutex_);
+  if (!nifi_instances_.empty()) {
+    refreshPeerList();
+    if (!peers_.empty())
+      peer_index_ = 0;
+  }
+  // populate the site2site protocol for load balancing between them
+  if (!peers_.empty()) {
+    auto count = 
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+    for (uint32_t i = 0; i < count; i++) {
+      auto peer_status = peers_[peer_index_];
+      sitetosite::SiteToSiteClientConfiguration 
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), 
local_network_interface_, client_type_);
+      peer_index_++;
+      if (peer_index_ >= static_cast<int>(peers_.size())) {
+        peer_index_ = 0;
+      }
+      logger_->log_trace("Creating client");
+      auto next_protocol = initializeProtocol(config);
+      logger_->log_trace("Created client, moving into available protocols");
+      returnProtocol(context, std::move(next_protocol));
+    }
+  } else {
+    // we don't have any peers
+    logger_->log_error("No peers selected during scheduling");
+  }
+}
+
+void RemoteProcessGroupPort::notifyStop() {
+  transmitting_ = false;
+  std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+  while (available_protocols_.try_dequeue(next_protocol)) {
+    // clear all protocols now
+  }
+}
+
+void RemoteProcessGroupPort::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("On trigger {}", getUUIDStr());
+  if (!transmitting_) {
+    return;
+  }
+
+  try {
+    logger_->log_trace("get protocol in on trigger");
+    auto protocol = getNextProtocol();
+
+    if (!protocol) {
+      logger_->log_info("no protocol, yielding");
+      context.yield();
+      return;
+    }
+
+    if (!protocol->transfer(direction_, context, session)) {
+      logger_->log_warn("protocol transmission failed, yielding");
+      context.yield();
+    }
+
+    returnProtocol(context, std::move(protocol));
+  } catch (const std::exception&) {
+    context.yield();
+    session.rollback();
+  }
+}
+
+std::optional<std::string> RemoteProcessGroupPort::getRestApiToken(const RPG& 
nifi) const {
+  std::string rest_user_name;
+  configure_->get(Configure::nifi_rest_api_user_name, rest_user_name);
+  if (rest_user_name.empty()) {
+    return std::nullopt;
+  }
+
+  std::string rest_password;
+  configure_->get(Configure::nifi_rest_api_password, rest_password);
+
+  std::stringstream login_url;
+  login_url << nifi.protocol << nifi.host;
+  // don't append port if it is 0 ( undefined )
+  if (nifi.port > 0) {
+    login_url << ":" << std::to_string(nifi.port);
+  }
+  login_url << "/nifi-api/access/token";
+
+  auto client_ptr = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", 
"HTTPClient");
+  if (nullptr == client_ptr) {
+    logger_->log_error("Could not locate HTTPClient. You do not have cURL 
support!");
+    return std::nullopt;
+  }
+  auto client = 
std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
+  client->initialize(http::HttpRequestMethod::GET, login_url.str(), 
ssl_service_);
+  // use a connection timeout. if this times out we will simply attempt 
re-connection
+  // so no need for configuration parameter that isn't already defined in 
Processor
+  client->setConnectionTimeout(10s);
+  client->setReadTimeout(idle_timeout_);
+
+  auto token = http::get_token(client.get(), rest_user_name, rest_password);
+  logger_->log_debug("Token from NiFi REST Api endpoint {},  {}", 
login_url.str(), token);
+  return token;
+}
+
+std::optional<std::pair<std::string, uint16_t>> 
RemoteProcessGroupPort::parseSiteToSiteDataFromControllerConfig(const RPG& 
nifi, const std::string& controller) const {
+  rapidjson::Document doc;
+  rapidjson::ParseResult ok = doc.Parse(controller.c_str());
+
+  if (!ok || !doc.IsObject() || doc.ObjectEmpty()) {
+    return std::nullopt;
+  }
+
+  rapidjson::Value::MemberIterator itr = doc.FindMember("controller");
+
+  if (itr == doc.MemberEnd() || !itr->value.IsObject()) {
+    return std::nullopt;
+  }
+
+  rapidjson::Value controllerValue = itr->value.GetObject();
+  rapidjson::Value::ConstMemberIterator end_itr = controllerValue.MemberEnd();
+  rapidjson::Value::ConstMemberIterator port_itr = 
controllerValue.FindMember("remoteSiteListeningPort");
+  rapidjson::Value::ConstMemberIterator secure_itr = 
controllerValue.FindMember("siteToSiteSecure");
+  bool site_to_site_secure = secure_itr != end_itr && 
secure_itr->value.IsBool() ? secure_itr->value.GetBool() : false;
+
+  uint16_t site_to_site_port = 0;
+  if (client_type_ == sitetosite::ClientType::RAW && port_itr != end_itr && 
port_itr->value.IsNumber()) {
+    site_to_site_port = port_itr->value.GetInt();
+  } else {
+    site_to_site_port = nifi.port;
+  }
+
+  logger_->log_debug("process group remote site2site port {}, is secure {}", 
site_to_site_port, site_to_site_secure);
+  return std::make_pair(nifi.host, site_to_site_port);
+}
+
+std::optional<std::pair<std::string, uint16_t>> 
RemoteProcessGroupPort::tryRefreshSiteToSiteInstance(RPG nifi) const {  // 
NOLINT(performance-unnecessary-value-param)
+#ifdef WIN32
+  if ("localhost" == nifi.host) {
+    nifi.host = org::apache::nifi::minifi::utils::net::getMyHostName();
+  }
+#endif
+  auto token = getRestApiToken(nifi);
+  if (token && token->empty()) {
+    return std::nullopt;
+  }
+
+  auto client_ptr = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", 
"HTTPClient");
+  if (nullptr == client_ptr) {
+    logger_->log_error("Could not locate HTTPClient. You do not have cURL 
support, defaulting to base configuration!");
+    return std::nullopt;
+  }
+
+  auto client = 
std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
+  auto full_url  = buildFullSiteToSiteUrl(nifi);
+  client->initialize(http::HttpRequestMethod::GET, full_url, ssl_service_);
+  // use a connection timeout. if this times out we will simply attempt 
re-connection
+  // so no need for configuration parameter that isn't already defined in 
Processor
+  client->setConnectionTimeout(10s);
+  client->setReadTimeout(idle_timeout_);
+  if (!proxy_.host.empty()) {
+    client->setHTTPProxy(proxy_);
+  }
+  if (token) {
+    client->setRequestHeader("Authorization", token);
+  }
+
+  client->setVerbose(false);
+
+  if (!client->submit() || client->getResponseCode() != 200) {
+    logger_->log_error("ProcessGroup::refreshRemoteSiteToSiteInfo -- 
curl_easy_perform() failed , response code {}\n", client->getResponseCode());
+    return std::nullopt;
+  }
+
+  const std::vector<char> &response_body = client->getResponseBody();
+  if (response_body.empty()) {
+    logger_->log_error("Cannot output body to content for 
ProcessGroup::refreshRemoteSiteToSiteInfo: received HTTP code {} from {}", 
client->getResponseCode(), full_url);
+    return std::nullopt;
+  }
+
+  std::string controller = std::string(response_body.begin(), 
response_body.end());
+  logger_->log_trace("controller config {}", controller);
+  return parseSiteToSiteDataFromControllerConfig(nifi, controller);
+}
+
+std::optional<std::pair<std::string, uint16_t>> 
RemoteProcessGroupPort::refreshRemoteSiteToSiteInfo() {
+  if (nifi_instances_.empty()) {
+    return std::nullopt;
+  }
+
+  for (const auto& nifi : nifi_instances_) {
+    auto result = tryRefreshSiteToSiteInstance(nifi);
+    if (result) {
+      return result;
+    }
+  }
+  return std::nullopt;
+}
+
+void RemoteProcessGroupPort::refreshPeerList() {
+  auto connection = refreshRemoteSiteToSiteInfo();
+  if (!connection) {
+    logger_->log_debug("No port configured");

Review Comment:
   You're right, it should be a warning at least, updated in 
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/6a61e1c192b1dfcb6c5f0977432f73ee12557367



##########
libminifi/src/RemoteProcessGroupPort.cpp:
##########
@@ -0,0 +1,369 @@
+/**
+ * @file RemoteProcessGroupPort.cpp
+ * RemoteProcessGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RemoteProcessGroupPort.h"
+
+#include <cinttypes>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "Exception.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessContext.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "http/BaseHTTPClient.h"
+#include "rapidjson/document.h"
+#include "sitetosite/Peer.h"
+#include "sitetosite/SiteToSiteFactory.h"
+#include "utils/net/DNS.h"
+
+#undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, 
which conflicts with rapidjson
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi {
+
+namespace {
+std::string buildFullSiteToSiteUrl(const RPG& nifi) {
+  std::stringstream full_url;
+  full_url << nifi.protocol << nifi.host;
+  // don't append port if it is 0 ( undefined )
+  if (nifi.port > 0) {
+    full_url << ":" << std::to_string(nifi.port);
+  }
+  full_url << "/nifi-api/site-to-site";
+  return full_url.str();
+}
+}  // namespace
+
+const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME = 
"RemoteProcessGroupPortSSLContextService";
+
+void RemoteProcessGroupPort::setURL(const std::string& val) {
+  auto urls = utils::string::split(val, ",");
+  for (const auto& url : urls) {
+    http::URL parsed_url{utils::string::trim(url)};
+    if (parsed_url.isValid()) {
+      logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url, 
parsed_url.hostPort());
+      nifi_instances_.push_back({parsed_url.host(), parsed_url.port(), 
parsed_url.protocol()});
+    } else {
+      logger_->log_error("Could not parse RPG URL '{}'", url);
+    }
+  }
+}
+
+gsl::not_null<std::unique_ptr<sitetosite::SiteToSiteClient>> 
RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration&
 config) const {
+  config.setSecurityContext(ssl_service_);
+  config.setHTTPProxy(proxy_);
+  config.setIdleTimeout(idle_timeout_);
+  config.setUseCompression(use_compression_);
+  config.setBatchCount(batch_count_);
+  config.setBatchSize(batch_size_);
+  config.setBatchDuration(batch_duration_);
+  config.setTimeout(timeout_);
+
+  return sitetosite::createClient(config);
+}
+
+std::unique_ptr<sitetosite::SiteToSiteClient> 
RemoteProcessGroupPort::getNextProtocol() {
+  std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+  if (!available_protocols_.try_dequeue(next_protocol)) {
+    std::lock_guard<std::mutex> lock(peer_mutex_);
+    if (peer_index_ >= 0) {
+      logger_->log_debug("Creating client from peer {}", peer_index_);
+      auto& peer_status = peers_[peer_index_];
+      sitetosite::SiteToSiteClientConfiguration 
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), 
local_network_interface_, client_type_);
+      peer_index_++;
+      if (peer_index_ >= static_cast<int>(peers_.size())) {
+        peer_index_ = 0;
+      }
+      next_protocol = initializeProtocol(config);
+    } else {
+      logger_->log_debug("Refreshing the peer list since there are none 
configured.");
+      refreshPeerList();
+    }
+  }
+  logger_->log_debug("Obtained protocol from available_protocols_");
+  return next_protocol;
+}
+
+void RemoteProcessGroupPort::returnProtocol(core::ProcessContext& context, 
std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) {
+  auto count = 
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+  if (available_protocols_.size_approx() >= count) {
+    logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
+    // let the memory be freed
+    return;
+  }
+  logger_->log_debug("enqueueing protocol {}, have a total of {}", 
getUUIDStr(), available_protocols_.size_approx());
+  available_protocols_.enqueue(std::move(return_protocol));
+}
+
+void RemoteProcessGroupPort::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+
+  logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  if (auto protocol_uuid = context.getProperty(portUUID)) {
+    protocol_uuid_ = *protocol_uuid;
+  }
+
+  auto context_name = context.getProperty(SSLContext);
+  if (!context_name || IsNullOrEmpty(*context_name)) {
+    context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+  }
+
+  std::shared_ptr<core::controller::ControllerService> service = 
context.getControllerService(*context_name, getUUID());
+  if (nullptr != service) {
+    ssl_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(service);
+  } else {
+    std::string secureStr;
+    if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && 
utils::string::toBool(secureStr).value_or(false)) {
+      ssl_service_ = 
std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME,
 configure_);
+      ssl_service_->onEnable();
+    }
+  }
+
+  idle_timeout_ = context.getProperty(idleTimeout) | 
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | 
utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
+
+  std::lock_guard<std::mutex> lock(peer_mutex_);
+  if (!nifi_instances_.empty()) {
+    refreshPeerList();
+    if (!peers_.empty())
+      peer_index_ = 0;
+  }
+  // populate the site2site protocol for load balancing between them
+  if (!peers_.empty()) {
+    auto count = 
std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
+    for (uint32_t i = 0; i < count; i++) {
+      auto peer_status = peers_[peer_index_];
+      sitetosite::SiteToSiteClientConfiguration 
config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), 
local_network_interface_, client_type_);
+      peer_index_++;
+      if (peer_index_ >= static_cast<int>(peers_.size())) {
+        peer_index_ = 0;
+      }
+      logger_->log_trace("Creating client");
+      auto next_protocol = initializeProtocol(config);
+      logger_->log_trace("Created client, moving into available protocols");
+      returnProtocol(context, std::move(next_protocol));
+    }
+  } else {
+    // we don't have any peers
+    logger_->log_error("No peers selected during scheduling");
+  }
+}
+
+void RemoteProcessGroupPort::notifyStop() {
+  transmitting_ = false;
+  std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
+  while (available_protocols_.try_dequeue(next_protocol)) {
+    // clear all protocols now
+  }
+}
+
+void RemoteProcessGroupPort::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  logger_->log_trace("On trigger {}", getUUIDStr());
+  if (!transmitting_) {
+    return;
+  }
+
+  try {
+    logger_->log_trace("get protocol in on trigger");
+    auto protocol = getNextProtocol();
+
+    if (!protocol) {
+      logger_->log_info("no protocol, yielding");
+      context.yield();
+      return;
+    }
+
+    if (!protocol->transfer(direction_, context, session)) {
+      logger_->log_warn("protocol transmission failed, yielding");
+      context.yield();
+    }
+
+    returnProtocol(context, std::move(protocol));
+  } catch (const std::exception&) {
+    context.yield();
+    session.rollback();
+  }
+}
+
+std::optional<std::string> RemoteProcessGroupPort::getRestApiToken(const RPG& 
nifi) const {
+  std::string rest_user_name;
+  configure_->get(Configure::nifi_rest_api_user_name, rest_user_name);
+  if (rest_user_name.empty()) {
+    return std::nullopt;
+  }
+
+  std::string rest_password;
+  configure_->get(Configure::nifi_rest_api_password, rest_password);
+
+  std::stringstream login_url;
+  login_url << nifi.protocol << nifi.host;
+  // don't append port if it is 0 ( undefined )
+  if (nifi.port > 0) {
+    login_url << ":" << std::to_string(nifi.port);
+  }
+  login_url << "/nifi-api/access/token";
+
+  auto client_ptr = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", 
"HTTPClient");
+  if (nullptr == client_ptr) {
+    logger_->log_error("Could not locate HTTPClient. You do not have cURL 
support!");
+    return std::nullopt;
+  }
+  auto client = 
std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
+  client->initialize(http::HttpRequestMethod::GET, login_url.str(), 
ssl_service_);
+  // use a connection timeout. if this times out we will simply attempt 
re-connection
+  // so no need for configuration parameter that isn't already defined in 
Processor
+  client->setConnectionTimeout(10s);
+  client->setReadTimeout(idle_timeout_);
+
+  auto token = http::get_token(client.get(), rest_user_name, rest_password);
+  logger_->log_debug("Token from NiFi REST Api endpoint {},  {}", 
login_url.str(), token);

Review Comment:
   Good point, updated in 
https://github.com/apache/nifi-minifi-cpp/pull/1966/commits/6a61e1c192b1dfcb6c5f0977432f73ee12557367



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to