szaszm commented on code in PR #2004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2004#discussion_r2452750501


##########
extensions/mqtt/tests/ConsumeMQTTTests.cpp:
##########
@@ -20,139 +20,352 @@
 #include "catch2/matchers/catch_matchers_string.hpp"
 #include "unit/TestBase.h"
 #include "../processors/ConsumeMQTT.h"
+#include "core/Resource.h"
+#include "unit/SingleProcessorTestController.h"
+#include "rapidjson/document.h"
+#include "unit/ProcessorUtils.h"
 
-namespace {
-struct Fixture {
-  Fixture() {
-    
LogTestController::getInstance().setDebug<minifi::processors::ConsumeMQTT>();
-    plan_ = testController_.createPlan();
-    consumeMqttProcessor_ = plan_->addProcessor("ConsumeMQTT", 
"consumeMqttProcessor");
+namespace org::apache::nifi::minifi::test {
+void verifyXmlJsonResult(const std::string& json_content, size_t 
expected_record_count, bool add_attributes_as_fields) {
+  rapidjson::Document document;
+  document.Parse(json_content.c_str());
+  REQUIRE(document.IsArray());
+  REQUIRE(document.GetArray().Size() == expected_record_count);
+  for (size_t i = 0; i < expected_record_count; ++i) {
+    auto& current_record = document[gsl::narrow<rapidjson::SizeType>(i)];
+    REQUIRE(current_record.IsObject());
+    REQUIRE(current_record.HasMember("int_value"));
+    uint64_t int_result = current_record["int_value"].GetInt64();
+    CHECK(int_result == 42);
+    REQUIRE(current_record.HasMember("string_value"));
+    std::string string_result = current_record["string_value"].GetString();
+    CHECK(string_result == "test");
+
+    if (add_attributes_as_fields) {
+      string_result = current_record["_topic"].GetString();
+      CHECK(string_result == "mytopic/segment/" + std::to_string(i));
+      auto array = current_record["_topicSegments"].GetArray();
+      CHECK(array.Size() == 3);
+      string_result = array[0].GetString();
+      CHECK(string_result == "mytopic");
+      string_result = array[1].GetString();
+      CHECK(string_result == "segment");
+      string_result = array[2].GetString();
+      CHECK(string_result == std::to_string(i));
+      int_result = current_record["_qos"].GetInt64();
+      CHECK(int_result == i);
+      bool bool_result = current_record["_isDuplicate"].GetBool();
+      if (i == 0) {
+        CHECK_FALSE(bool_result);
+      } else {
+        CHECK(bool_result);
+      }
+      bool_result = current_record["_isRetained"].GetBool();
+      if (i == 0) {
+        CHECK_FALSE(bool_result);
+      } else {
+        CHECK(bool_result);
+      }
+    } else {
+      CHECK_FALSE(current_record.HasMember("_topic"));
+      CHECK_FALSE(current_record.HasMember("_qos"));
+      CHECK_FALSE(current_record.HasMember("_isDuplicate"));
+      CHECK_FALSE(current_record.HasMember("_isRetained"));
+    }
+  }
+}
+
+class TestConsumeMQTTProcessor : public minifi::processors::ConsumeMQTT {
+ public:
+  using SmartMessage = processors::AbstractMQTTProcessor::SmartMessage;
+  using MQTTMessageDeleter = 
processors::AbstractMQTTProcessor::MQTTMessageDeleter;
+  explicit TestConsumeMQTTProcessor(minifi::core::ProcessorMetadata metadata)
+      : minifi::processors::ConsumeMQTT(std::move(metadata)) {}
+
+  void initializeClient() override {
+  }
+
+  void enqueueReceivedMQTTMsg(SmartMessage message) {
+    
minifi::processors::ConsumeMQTT::enqueueReceivedMQTTMsg(std::move(message));
+  }

Review Comment:
   ```suggestion
     using ConsumeMQTT::enqueueReceivedMQTTMsg;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to