adamdebreceni commented on a change in pull request #937:
URL: https://github.com/apache/nifi-minifi-cpp/pull/937#discussion_r526035745
##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -873,6 +664,161 @@ void C2Agent::update_agent() {
}
}
+utils::TaskRescheduleInfo C2Agent::produce() {
+ // place priority on messages to send to the c2 server
+ if (protocol_.load() != nullptr) {
+ std::vector<C2Payload> payload_batch;
+ payload_batch.reserve(max_c2_responses);
+ auto getRequestPayload = [&payload_batch] (C2Payload&& payload) {
payload_batch.emplace_back(std::move(payload)); };
+ for (std::size_t attempt_num = 0; attempt_num < max_c2_responses;
++attempt_num) {
+ if (!requests.consume(getRequestPayload)) {
+ break;
+ }
+ }
+ std::for_each(
+ std::make_move_iterator(payload_batch.begin()),
+ std::make_move_iterator(payload_batch.end()),
+ [&] (C2Payload&& payload) {
+ try {
+ C2Payload && response =
protocol_.load()->consumePayload(std::move(payload));
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload.
error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknown exception occurred while consuming
payload.");
+ }
+ });
+
+ try {
+ performHeartBeat();
+ }
+ catch (const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat.
error: %s", e.what());
+ }
+ catch (...) {
+ logger_->log_error("Unknonwn exception occurred while performing
heartbeat.");
+ }
+ }
+
+ checkTriggers();
+
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+}
+
+utils::TaskRescheduleInfo C2Agent::consume() {
+ const auto consume_success = responses.consume([this] (C2Payload&& payload) {
+ extractPayload(std::move(payload));
+ });
+ if (!consume_success) {
+ extractPayload(C2Payload{ Operation::HEARTBEAT });
+ }
+ return
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
+}
+
+utils::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
+ if (!utils::StringUtils::startsWith(uri, "http") || protocol_.load() ==
nullptr) {
+ // try to open the file
+ utils::optional<std::string> content = filesystem_->read(uri);
+ if (content) {
+ return content;
+ }
+ }
+ // couldn't open as file and we have no protocol to request the file from
+ if (protocol_.load() == nullptr) {
+ return {};
+ }
+
+ std::string resolved_url = uri;
+ if (!utils::StringUtils::startsWith(uri, "http")) {
+ std::stringstream adjusted_url;
+ std::string base;
+ if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
+ adjusted_url << base;
+ if (!utils::StringUtils::endsWith(base, "/")) {
+ adjusted_url << "/";
+ }
+ adjusted_url << uri;
+ resolved_url = adjusted_url.str();
+ } else if (configuration_->get("c2.rest.url", base)) {
+ std::string host, protocol;
+ int port = -1;
+ utils::parse_url(&base, &host, &port, &protocol);
+ adjusted_url << protocol << host;
+ if (port > 0) {
+ adjusted_url << ":" << port;
+ }
+ adjusted_url << "/c2/api/" << uri;
+ resolved_url = adjusted_url.str();
+ }
+ }
+
+ C2Payload payload(Operation::TRANSFER, false, true);
+ C2Payload &&response = protocol_.load()->consumePayload(resolved_url,
payload, RECEIVE, false);
+
+ auto raw_data = response.getRawData();
+ return std::string(raw_data.data(), raw_data.size());
+}
+
+bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
+ auto url = resp.operation_arguments.find("location");
+
+ std::string file_uri;
+ std::string configuration_str;
+
+ if (url == resp.operation_arguments.end()) {
+ logger_->log_debug("Did not have location within %s", resp.ident);
+ auto update_text = resp.operation_arguments.find("configuration_data");
+ if (update_text == resp.operation_arguments.end()) {
+ logger_->log_debug("Neither the config file location nor the data is
provided");
+ C2Payload response(Operation::ACKNOWLEDGE,
state::UpdateState::SET_ERROR, resp.ident, false, true);
+ response.setRawData("Error while applying flow. Neither the config file
location nor the data is provided.");
+ enqueue_c2_response(std::move(response));
+ return false;
+ }
+ configuration_str = update_text->second.to_string();
+ }
+
+ file_uri = url->second.to_string();
+ utils::optional<std::string> optional_configuration_str =
fetchFlow(file_uri);
Review comment:
yes, previously "fetching" worked by creating a temporary file, writing
into that, and returning the name of the file, if we are handling sensitive
data (flow config which needs to be encrypted) persisting the raw data to
however short amount of time seems dangerous, so now the content is returned in
memory
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]