adamdebreceni commented on a change in pull request #937:
URL: https://github.com/apache/nifi-minifi-cpp/pull/937#discussion_r526036252



##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -873,6 +664,161 @@ void C2Agent::update_agent() {
   }
 }
 
+utils::TaskRescheduleInfo C2Agent::produce() {
+  // place priority on messages to send to the c2 server
+  if (protocol_.load() != nullptr) {
+    std::vector<C2Payload> payload_batch;
+    payload_batch.reserve(max_c2_responses);
+    auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { 
payload_batch.emplace_back(std::move(payload)); };
+    for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; 
++attempt_num) {
+      if (!requests.consume(getRequestPayload)) {
+        break;
+      }
+    }
+    std::for_each(
+        std::make_move_iterator(payload_batch.begin()),
+        std::make_move_iterator(payload_batch.end()),
+        [&] (C2Payload&& payload) {
+          try {
+            C2Payload && response = 
protocol_.load()->consumePayload(std::move(payload));
+            enqueue_c2_server_response(std::move(response));
+          }
+          catch(const std::exception &e) {
+            logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
+          }
+          catch(...) {
+            logger_->log_error("Unknown exception occurred while consuming 
payload.");
+          }
+        });
+
+    try {
+      performHeartBeat();
+    }
+    catch (const std::exception &e) {
+      logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
+    }
+    catch (...) {
+      logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
+    }
+  }
+
+  checkTriggers();
+
+  return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+}
+
+utils::TaskRescheduleInfo C2Agent::consume() {
+  const auto consume_success = responses.consume([this] (C2Payload&& payload) {
+    extractPayload(std::move(payload));
+  });
+  if (!consume_success) {
+    extractPayload(C2Payload{ Operation::HEARTBEAT });
+  }
+  return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
+}
+
+utils::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
+  if (!utils::StringUtils::startsWith(uri, "http") || protocol_.load() == 
nullptr) {
+    // try to open the file
+    utils::optional<std::string> content = filesystem_->read(uri);
+    if (content) {
+      return content;
+    }
+  }
+  // couldn't open as file and we have no protocol to request the file from
+  if (protocol_.load() == nullptr) {
+    return {};
+  }
+
+  std::string resolved_url = uri;
+  if (!utils::StringUtils::startsWith(uri, "http")) {
+    std::stringstream adjusted_url;
+    std::string base;
+    if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
+      adjusted_url << base;
+      if (!utils::StringUtils::endsWith(base, "/")) {
+        adjusted_url << "/";
+      }
+      adjusted_url << uri;
+      resolved_url = adjusted_url.str();
+    } else if (configuration_->get("c2.rest.url", base)) {
+      std::string host, protocol;
+      int port = -1;
+      utils::parse_url(&base, &host, &port, &protocol);
+      adjusted_url << protocol << host;
+      if (port > 0) {
+        adjusted_url << ":" << port;
+      }
+      adjusted_url << "/c2/api/" << uri;
+      resolved_url = adjusted_url.str();
+    }
+  }
+
+  C2Payload payload(Operation::TRANSFER, false, true);
+  C2Payload &&response = protocol_.load()->consumePayload(resolved_url, 
payload, RECEIVE, false);
+
+  auto raw_data = response.getRawData();
+  return std::string(raw_data.data(), raw_data.size());
+}
+
+bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
+  auto url = resp.operation_arguments.find("location");
+
+  std::string file_uri;
+  std::string configuration_str;
+
+  if (url == resp.operation_arguments.end()) {
+    logger_->log_debug("Did not have location within %s", resp.ident);
+    auto update_text = resp.operation_arguments.find("configuration_data");
+    if (update_text == resp.operation_arguments.end()) {
+      logger_->log_debug("Neither the config file location nor the data is 
provided");
+      C2Payload response(Operation::ACKNOWLEDGE, 
state::UpdateState::SET_ERROR, resp.ident, false, true);
+      response.setRawData("Error while applying flow. Neither the config file 
location nor the data is provided.");
+      enqueue_c2_response(std::move(response));
+      return false;
+    }
+    configuration_str = update_text->second.to_string();
+  }
+
+  file_uri = url->second.to_string();
+  utils::optional<std::string> optional_configuration_str = 
fetchFlow(file_uri);
+  if (!optional_configuration_str) {
+    logger_->log_debug("Couldn't load new flow configuration from: \"%s\"", 
file_uri);
+    C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, 
resp.ident, false, true);
+    response.setRawData("Error while applying flow. Couldn't load flow 
configuration.");
+    enqueue_c2_response(std::move(response));
+    return false;
+  }
+  configuration_str = optional_configuration_str.value();
+
+  bool should_persist = [&] {
+    auto persist = resp.operation_arguments.find("persist");
+    if (persist == resp.operation_arguments.end()) {
+      return false;
+    }
+    return utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), 
"true");
+  }();
+
+  int16_t err = {update_sink_->applyUpdate(file_uri, configuration_str, 
should_persist)};

Review comment:
       to prevent accidental narrowing conversion in case we change the return 
type of `applyUpdate` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to