fgerlits commented on a change in pull request #937:
URL: https://github.com/apache/nifi-minifi-cpp/pull/937#discussion_r525961588
##########
File path: libminifi/include/utils/file/PathUtils.h
##########
@@ -17,12 +17,20 @@
#ifndef LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
#define LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
+#include <climits>
#include <cctype>
#include <cinttypes>
#include <memory>
#include <string>
#include <system_error>
#include <utility>
+#include "utils/OptionalUtils.h"
+
+#ifdef _MSC_VER
Review comment:
why is this `_MSC_VER` and not `WIN32`?
##########
File path: libminifi/include/utils/file/PathUtils.h
##########
@@ -62,6 +70,21 @@ inline bool isAbsolutePath(const char* const path) noexcept {
#endif
}
+inline utils::optional<std::string> canonicalize(const std::string &path) {
Review comment:
This looks very similar to `getFullPath()`, except it doesn't do
anything in the Windows case. Is that intentional?
##########
File path: libminifi/src/FlowController.cpp
##########
@@ -884,11 +534,14 @@ void FlowController::disableAllControllerServices() {
controller_service_provider_->disableAllControllerServices();
}
-int16_t FlowController::applyUpdate(const std::string &source, const
std::string &configuration) {
+int16_t FlowController::applyUpdate(const std::string &source, const
std::string &configuration, bool persist) {
if (applyConfiguration(source, configuration)) {
- return 1;
- } else {
+ if (persist) {
+ flow_configuration_->persist(configuration);
+ }
return 0;
+ } else {
+ return -1;
Review comment:
Oh wow, so this was completely backwards. Thanks for fixing it!
##########
File path: libminifi/src/FlowController.cpp
##########
@@ -70,60 +71,33 @@ namespace apache {
namespace nifi {
namespace minifi {
-#define DEFAULT_CONFIG_NAME "conf/config.yml"
-
-FlowController::FlowController(std::shared_ptr<core::Repository>
provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
std::shared_ptr<Configure> configure,
- std::unique_ptr<core::FlowConfiguration>
flow_configuration, std::shared_ptr<core::ContentRepository> content_repo,
const std::string name, bool headless_mode)
+FlowController::FlowController(std::shared_ptr<core::Repository>
provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
+ std::shared_ptr<Configure> configure,
std::unique_ptr<core::FlowConfiguration> flow_configuration,
+ std::shared_ptr<core::ContentRepository>
content_repo, const std::string name, bool headless_mode,
+ std::shared_ptr<utils::file::FileSystem>
filesystem)
:
core::controller::ControllerServiceProvider(core::getClassName<FlowController>()),
- root_(nullptr),
+ c2::C2Client(std::move(configure), std::move(provenance_repo),
std::move(flow_file_repo),
+ std::move(content_repo), std::move(flow_configuration),
std::move(filesystem)),
running_(false),
updating_(false),
- c2_enabled_(true),
initialized_(false),
- provenance_repo_(provenance_repo),
- flow_file_repo_(flow_file_repo),
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
- flow_configuration_(std::move(flow_configuration)),
- configuration_(std::move(configure)),
- content_repo_(content_repo),
logger_(logging::LoggerFactory<FlowController>::getLogger()) {
- if (provenance_repo == nullptr)
+ if (IsNullOrEmpty(provenance_repo_))
Review comment:
What is the point of this? Can `my_shared_ptr == nullptr` and
`my_shared_ptr.get() == nullptr` be different?
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -873,6 +664,161 @@ void C2Agent::update_agent() {
}
}
+utils::TaskRescheduleInfo C2Agent::produce() {
+ // place priority on messages to send to the c2 server
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consume(getRequestPayload)) {
+ break;
+ }
+ }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknown exception occurred while consuming
payload.");
+ }
+ });
+
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+}
+
+utils::TaskRescheduleInfo C2Agent::consume() {
+ const auto consume_success = responses.consume([this] (C2Payload&& payload) {
+ extractPayload(std::move(payload));
+ });
+ if (!consume_success) {
+ extractPayload(C2Payload{ Operation::HEARTBEAT });
+ }
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
+}
+
+utils::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
+ if (!utils::StringUtils::startsWith(uri, "http") || protocol_.load() ==
nullptr) {
+ // try to open the file
+ utils::optional<std::string> content = filesystem_->read(uri);
+ if (content) {
+ return content;
+ }
+ }
+ // couldn't open as file and we have no protocol to request the file from
+ if (protocol_.load() == nullptr) {
+ return {};
+ }
+
+ std::string resolved_url = uri;
+ if (!utils::StringUtils::startsWith(uri, "http")) {
+ std::stringstream adjusted_url;
+ std::string base;
+ if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
+ adjusted_url << base;
+ if (!utils::StringUtils::endsWith(base, "/")) {
+ adjusted_url << "/";
+ }
+ adjusted_url << uri;
+ resolved_url = adjusted_url.str();
+ } else if (configuration_->get("c2.rest.url", base)) {
+ std::string host, protocol;
+ int port = -1;
+ utils::parse_url(&base, &host, &port, &protocol);
+ adjusted_url << protocol << host;
+ if (port > 0) {
+ adjusted_url << ":" << port;
+ }
+ adjusted_url << "/c2/api/" << uri;
+ resolved_url = adjusted_url.str();
+ }
+ }
+
+ C2Payload payload(Operation::TRANSFER, false, true);
+ C2Payload &&response = protocol_.load()->consumePayload(resolved_url,
payload, RECEIVE, false);
+
+ auto raw_data = response.getRawData();
+ return std::string(raw_data.data(), raw_data.size());
+}
+
+bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
+ auto url = resp.operation_arguments.find("location");
+
+ std::string file_uri;
+ std::string configuration_str;
+
+ if (url == resp.operation_arguments.end()) {
+ logger_->log_debug("Did not have location within %s", resp.ident);
+ auto update_text = resp.operation_arguments.find("configuration_data");
+ if (update_text == resp.operation_arguments.end()) {
+ logger_->log_debug("Neither the config file location nor the data is
provided");
+ C2Payload response(Operation::ACKNOWLEDGE,
state::UpdateState::SET_ERROR, resp.ident, false, true);
+ response.setRawData("Error while applying flow. Neither the config file
location nor the data is provided.");
+ enqueue_c2_response(std::move(response));
+ return false;
+ }
+ configuration_str = update_text->second.to_string();
+ }
+
+ file_uri = url->second.to_string();
Review comment:
`url` could be `== end` here, if both tests in line 770 and line 773
fail. I think lines 783-792 should be in an `else` block.
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -873,6 +664,161 @@ void C2Agent::update_agent() {
}
}
+utils::TaskRescheduleInfo C2Agent::produce() {
+ // place priority on messages to send to the c2 server
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consume(getRequestPayload)) {
+ break;
+ }
+ }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknown exception occurred while consuming
payload.");
+ }
+ });
+
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+}
+
+utils::TaskRescheduleInfo C2Agent::consume() {
+ const auto consume_success = responses.consume([this] (C2Payload&& payload) {
+ extractPayload(std::move(payload));
+ });
+ if (!consume_success) {
+ extractPayload(C2Payload{ Operation::HEARTBEAT });
+ }
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
+}
+
+utils::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
+ if (!utils::StringUtils::startsWith(uri, "http") || protocol_.load() ==
nullptr) {
+ // try to open the file
+ utils::optional<std::string> content = filesystem_->read(uri);
+ if (content) {
+ return content;
+ }
+ }
+ // couldn't open as file and we have no protocol to request the file from
+ if (protocol_.load() == nullptr) {
+ return {};
+ }
+
+ std::string resolved_url = uri;
+ if (!utils::StringUtils::startsWith(uri, "http")) {
+ std::stringstream adjusted_url;
+ std::string base;
+ if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
+ adjusted_url << base;
+ if (!utils::StringUtils::endsWith(base, "/")) {
+ adjusted_url << "/";
+ }
+ adjusted_url << uri;
+ resolved_url = adjusted_url.str();
+ } else if (configuration_->get("c2.rest.url", base)) {
+ std::string host, protocol;
+ int port = -1;
+ utils::parse_url(&base, &host, &port, &protocol);
+ adjusted_url << protocol << host;
+ if (port > 0) {
+ adjusted_url << ":" << port;
+ }
+ adjusted_url << "/c2/api/" << uri;
+ resolved_url = adjusted_url.str();
+ }
+ }
+
+ C2Payload payload(Operation::TRANSFER, false, true);
+ C2Payload &&response = protocol_.load()->consumePayload(resolved_url,
payload, RECEIVE, false);
+
+ auto raw_data = response.getRawData();
+ return std::string(raw_data.data(), raw_data.size());
+}
+
+bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
+ auto url = resp.operation_arguments.find("location");
+
+ std::string file_uri;
+ std::string configuration_str;
+
+ if (url == resp.operation_arguments.end()) {
+ logger_->log_debug("Did not have location within %s", resp.ident);
+ auto update_text = resp.operation_arguments.find("configuration_data");
+ if (update_text == resp.operation_arguments.end()) {
+ logger_->log_debug("Neither the config file location nor the data is
provided");
+ C2Payload response(Operation::ACKNOWLEDGE,
state::UpdateState::SET_ERROR, resp.ident, false, true);
+ response.setRawData("Error while applying flow. Neither the config file
location nor the data is provided.");
+ enqueue_c2_response(std::move(response));
+ return false;
+ }
+ configuration_str = update_text->second.to_string();
+ }
+
+ file_uri = url->second.to_string();
+ utils::optional<std::string> optional_configuration_str =
fetchFlow(file_uri);
+ if (!optional_configuration_str) {
+ logger_->log_debug("Couldn't load new flow configuration from: \"%s\"",
file_uri);
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR,
resp.ident, false, true);
+ response.setRawData("Error while applying flow. Couldn't load flow
configuration.");
+ enqueue_c2_response(std::move(response));
+ return false;
+ }
+ configuration_str = optional_configuration_str.value();
+
+ bool should_persist = [&] {
+ auto persist = resp.operation_arguments.find("persist");
+ if (persist == resp.operation_arguments.end()) {
+ return false;
+ }
+ return utils::StringUtils::equalsIgnoreCase(persist->second.to_string(),
"true");
+ }();
+
+ int16_t err = {update_sink_->applyUpdate(file_uri, configuration_str,
should_persist)};
Review comment:
why is there a `{ ... }` around this?
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -873,6 +664,161 @@ void C2Agent::update_agent() {
}
}
+utils::TaskRescheduleInfo C2Agent::produce() {
+ // place priority on messages to send to the c2 server
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consume(getRequestPayload)) {
+ break;
+ }
+ }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknown exception occurred while consuming
payload.");
+ }
+ });
+
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+}
+
+utils::TaskRescheduleInfo C2Agent::consume() {
+ const auto consume_success = responses.consume([this] (C2Payload&& payload) {
+ extractPayload(std::move(payload));
+ });
+ if (!consume_success) {
+ extractPayload(C2Payload{ Operation::HEARTBEAT });
+ }
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
+}
+
+utils::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
+ if (!utils::StringUtils::startsWith(uri, "http") || protocol_.load() ==
nullptr) {
+ // try to open the file
+ utils::optional<std::string> content = filesystem_->read(uri);
+ if (content) {
+ return content;
+ }
+ }
+ // couldn't open as file and we have no protocol to request the file from
+ if (protocol_.load() == nullptr) {
+ return {};
+ }
+
+ std::string resolved_url = uri;
+ if (!utils::StringUtils::startsWith(uri, "http")) {
+ std::stringstream adjusted_url;
+ std::string base;
+ if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
+ adjusted_url << base;
+ if (!utils::StringUtils::endsWith(base, "/")) {
+ adjusted_url << "/";
+ }
+ adjusted_url << uri;
+ resolved_url = adjusted_url.str();
+ } else if (configuration_->get("c2.rest.url", base)) {
+ std::string host, protocol;
+ int port = -1;
+ utils::parse_url(&base, &host, &port, &protocol);
+ adjusted_url << protocol << host;
+ if (port > 0) {
+ adjusted_url << ":" << port;
+ }
+ adjusted_url << "/c2/api/" << uri;
+ resolved_url = adjusted_url.str();
+ }
+ }
+
+ C2Payload payload(Operation::TRANSFER, false, true);
+ C2Payload &&response = protocol_.load()->consumePayload(resolved_url,
payload, RECEIVE, false);
+
+ auto raw_data = response.getRawData();
+ return std::string(raw_data.data(), raw_data.size());
+}
+
+bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
+ auto url = resp.operation_arguments.find("location");
+
+ std::string file_uri;
+ std::string configuration_str;
+
+ if (url == resp.operation_arguments.end()) {
+ logger_->log_debug("Did not have location within %s", resp.ident);
+ auto update_text = resp.operation_arguments.find("configuration_data");
+ if (update_text == resp.operation_arguments.end()) {
+ logger_->log_debug("Neither the config file location nor the data is
provided");
+ C2Payload response(Operation::ACKNOWLEDGE,
state::UpdateState::SET_ERROR, resp.ident, false, true);
+ response.setRawData("Error while applying flow. Neither the config file
location nor the data is provided.");
+ enqueue_c2_response(std::move(response));
+ return false;
+ }
+ configuration_str = update_text->second.to_string();
+ }
+
+ file_uri = url->second.to_string();
+ utils::optional<std::string> optional_configuration_str =
fetchFlow(file_uri);
Review comment:
I must admit I'm a bit lost here with all these refactorings, but what
looks like the equivalent of the return value of `fetchFlow()` in the old code,
at old line 672, the result was a `file_path`, and the configuration was the
contents of the file at this path. In the new code, `fetchFlow()` seems to
return the configuration itself. How does this work?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]