lordgamez commented on code in PR #1990:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1990#discussion_r2254114235


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -58,8 +84,55 @@ void ConsumeMQTT::readProperties(core::ProcessContext& 
context) {
   receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context, 
ReceiveMaximum));
 }
 
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& 
session) {
-  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, 
const std::queue<SmartMessage>& msg_queue) const {
+  if (!add_attributes_as_fields_) {
+    return;
+  }
+
+  for (auto& record: new_records) {
+    record.emplace("_topic", core::RecordField(msg_queue.front().topic));
+    auto topic_segments = utils::string::split(msg_queue.front().topic, "/");
+    for (size_t i = 0; i < topic_segments.size(); ++i) {
+      record.emplace("_topic.segment." + std::to_string(i), 
core::RecordField(topic_segments[i]));
+    }
+    record.emplace("_qos", core::RecordField(msg_queue.front().contents->qos));
+    record.emplace("_isDuplicate", 
core::RecordField(msg_queue.front().contents->dup > 0));
+    record.emplace("_isRetained", 
core::RecordField(msg_queue.front().contents->retained > 0));
+  }
+}
+
+void ConsumeMQTT::transferMessagesAsRecords(core::ProcessSession& session) {
+  gsl_Expects(record_set_reader_ && record_set_writer_);
+  auto msg_queue = getReceivedMqttMessages();
+  core::RecordSet record_set;
+  while (!msg_queue.empty()) {
+    io::BufferStream buffer_stream;
+    buffer_stream.write(reinterpret_cast<const 
uint8_t*>(msg_queue.front().contents->payload), 
gsl::narrow<size_t>(msg_queue.front().contents->payloadlen));
+    auto new_records_result = record_set_reader_->read(buffer_stream);
+    if (!new_records_result) {
+      logger_->log_error("Failed to read records from MQTT message: {}", 
new_records_result.error());
+      msg_queue.pop();
+      continue;
+    }
+    auto& new_records = new_records_result.value();
+    addAttributesAsRecordFields(new_records, msg_queue);
+    record_set.reserve(record_set.size() + new_records.size());
+    record_set.insert(record_set.end(), 
std::make_move_iterator(new_records.begin()), 
std::make_move_iterator(new_records.end()));
+    msg_queue.pop();
+  }
+  if (record_set.empty()) {
+    logger_->log_debug("No records to write, skipping FlowFile creation");
+    return;
+  }
+  std::shared_ptr<core::FlowFile> flow_file = session.create();
+  record_set_writer_->write(record_set, flow_file, session);
+  session.putAttribute(*flow_file, RecordCountOutputAttribute.name, 
std::to_string(record_set.size()));
+  session.putAttribute(*flow_file, BrokerOutputAttribute.name, uri_);
+  session.transfer(flow_file, Success);

Review Comment:
   In this case multiple records can be written to the flow file with each 
record being a separate MQTT message while in the `tranferMessagesAsFlowFiles` 
every flow file is a single MQTT message. Those attributes are MQTT message 
specific, so they cannot be added to the flow file if it contains multiple MQTT 
messages. For this scenario we can set the `Add Attributes As Fields` property 
to set these attributes as record fields for each record in the flow file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to