fgerlits commented on code in PR #1978:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1978#discussion_r2190261139


##########
extensions/standard-processors/processors/EvaluateJsonPath.cpp:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "EvaluateJsonPath.h"
+
+#include <unordered_map>
+
+#include "core/ProcessSession.h"
+#include "core/ProcessContext.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+#include "jsoncons_ext/jsonpath/jsonpath.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace {
+bool isScalar(const jsoncons::json& value) {
+  return !value.is_array() && !value.is_object();
+}
+
+bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) {
+  return query_result.empty() || (query_result.size() == 1 && 
isScalar(query_result[0]));
+}
+}  // namespace
+
+void EvaluateJsonPath::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void EvaluateJsonPath::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  destination_ = 
utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, 
EvaluateJsonPath::Destination);
+  if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && 
getDynamicProperties().size() > 1) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is 
allowed for JSON path when destination is set to flowfile-content");
+  }
+  null_value_representation_ = 
utils::parseEnumProperty<evaluate_json_path::NullValueRepresentationOption>(context,
 EvaluateJsonPath::NullValueRepresentation);
+  path_not_found_behavior_ = 
utils::parseEnumProperty<evaluate_json_path::PathNotFoundBehaviorOption>(context,
 EvaluateJsonPath::PathNotFoundBehavior);
+  return_type_ = 
utils::parseEnumProperty<evaluate_json_path::ReturnTypeOption>(context, 
EvaluateJsonPath::ReturnType);
+  if (return_type_ == evaluate_json_path::ReturnTypeOption::AutoDetect) {
+    if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) {
+      return_type_ = evaluate_json_path::ReturnTypeOption::JSON;
+    } else {
+      return_type_ = evaluate_json_path::ReturnTypeOption::Scalar;
+    }
+  }
+}
+
+std::string EvaluateJsonPath::extractQueryResult(const jsoncons::json& 
query_result) const {
+  gsl_Expects(!query_result.empty());
+  if (query_result.size() > 1) {
+    gsl_Assert(return_type_ == evaluate_json_path::ReturnTypeOption::JSON);
+    return query_result.to_string();
+  }
+
+  if (query_result[0].is_null()) {
+    return null_value_representation_ == 
evaluate_json_path::NullValueRepresentationOption::EmptyString ? "" : "null";
+  }
+
+  if (query_result[0].is_string()) {
+    return query_result[0].as<std::string>();
+  }
+
+  return query_result[0].to_string();
+}
+
+void EvaluateJsonPath::writeQueryResult(core::ProcessSession& session, 
core::FlowFile& flow_file, const jsoncons::json& query_result, const 
std::string& property_name,
+    std::unordered_map<std::string, std::string>& attributes_to_set) const {
+  if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) {
+    session.write(flow_file, [&query_result, this](const 
std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
+      auto result_string = extractQueryResult(query_result);
+      return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const 
uint8_t*>(result_string.data()), result_string.size()));
+    });
+  } else {
+    attributes_to_set.emplace(property_name, extractQueryResult(query_result));
+  }
+}
+
+void EvaluateJsonPath::onTrigger(core::ProcessContext&, core::ProcessSession& 
session) {
+  auto flow_file = session.get();
+  if (!flow_file) {
+    return;
+  }
+
+  const auto flow_file_read_result = session.readBuffer(flow_file);
+  const auto json_string = std::string(reinterpret_cast<const 
char*>(flow_file_read_result.buffer.data()), 
flow_file_read_result.buffer.size());

Review Comment:
   we have a function for this:
   ```suggestion
     const auto json_string = to_string(session.readBuffer(flow_file));
   ```



##########
extensions/standard-processors/processors/EvaluateJsonPath.cpp:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "EvaluateJsonPath.h"
+
+#include <unordered_map>
+
+#include "core/ProcessSession.h"
+#include "core/ProcessContext.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+#include "jsoncons_ext/jsonpath/jsonpath.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace {
+bool isScalar(const jsoncons::json& value) {
+  return !value.is_array() && !value.is_object();
+}
+
+bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) {
+  return query_result.empty() || (query_result.size() == 1 && 
isScalar(query_result[0]));
+}
+}  // namespace
+
+void EvaluateJsonPath::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void EvaluateJsonPath::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  destination_ = 
utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, 
EvaluateJsonPath::Destination);
+  if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && 
getDynamicProperties().size() > 1) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is 
allowed for JSON path when destination is set to flowfile-content");
+  }

Review Comment:
   We should also throw if there are no dynamic properties, as at least one is 
required for both destinations.



##########
extensions/standard-processors/tests/unit/EvaluateJsonPathTests.cpp:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "processors/EvaluateJsonPath.h"
+#include "unit/TestUtils.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class EvaluateJsonPathTestFixture {
+ public:
+  EvaluateJsonPathTestFixture() :
+      
controller_(std::make_unique<processors::EvaluateJsonPath>("EvaluateJsonPath")),
+      
evaluate_json_path_processor_(dynamic_cast<processors::EvaluateJsonPath*>(controller_.getProcessor()))
 {
+    REQUIRE(evaluate_json_path_processor_);
+    LogTestController::getInstance().setTrace<processors::EvaluateJsonPath>();
+  }
+
+ protected:
+  SingleProcessorTestController controller_;
+  processors::EvaluateJsonPath* evaluate_json_path_processor_;
+};
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "When destination is set to 
flowfile content only one dynamic property is allowed", 
"[EvaluateJsonPathTests]") {
+  controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::Destination, "flowfile-content");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute1", "value1");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute2", "value2");
+  REQUIRE_THROWS_WITH(controller_.trigger({{.content = "foo"}}), "Process 
Schedule Operation: Only one dynamic property is allowed for JSON path when 
destination is set to flowfile-content");
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Input flowfile has invalid JSON 
as content", "[EvaluateJsonPathTests]") {
+  ProcessorTriggerResult result;
+  std::string error_log;
+  SECTION("Flow file content is empty") {
+    result = controller_.trigger({{.content = ""}});
+    error_log = "FlowFile content is empty, transferring to Failure 
relationship";
+  }
+
+  SECTION("Flow file content is invalid json") {
+    result = controller_.trigger({{.content = "invalid json"}});
+    error_log = "FlowFile content is not a valid JSON document, transferring 
to Failure relationship";
+  }
+
+  CHECK(result.at(processors::EvaluateJsonPath::Matched).empty());
+  CHECK(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  CHECK(result.at(processors::EvaluateJsonPath::Failure).size() == 1);
+  CHECK(utils::verifyLogLinePresenceInPollTime(1s, error_log));
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Dynamic property contains 
invalid JSON path expression", "[EvaluateJsonPathTests]") {
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute", "1234");
+
+  auto result = controller_.trigger({{.content = "{}"}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1);
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Failure).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == "{}");
+  CHECK(utils::verifyLogLinePresenceInPollTime(0s, "Invalid JSON path 
expression '1234' found for attribute key 'attribute'"));
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in 
content when destination is set to attribute", "[EvaluateJsonPathTests]") {
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute1", "$.firstName");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute2", "$.lastName");
+
+  std::map<std::string, std::string> expected_attributes = {
+    {"attribute1", ""},
+    {"attribute2", ""}
+  };
+
+  bool warn_path_not_found_behavior = false;
+  bool expect_attributes = false;
+
+  SECTION("Ignore path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore");
+    expect_attributes = true;
+  }
+
+  SECTION("Skip path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "skip");
+  }
+
+  SECTION("Warn path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "warn");
+    warn_path_not_found_behavior = true;
+    expect_attributes = true;
+  }
+
+  auto result = controller_.trigger({{.content = "{}"}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1);
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty());
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Matched).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == "{}");
+
+  for (const auto& [key, value] : expected_attributes) {
+    std::string attribute_value;
+    if (!expect_attributes) {
+      CHECK_FALSE(result_flow_file->getAttribute(key, attribute_value));
+    } else {
+      CHECK(result_flow_file->getAttribute(key, attribute_value));
+      CHECK(attribute_value == value);
+    }
+  }
+
+  if (warn_path_not_found_behavior) {
+    CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' 
not found for attribute key 'attribute1'"));
+    CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.lastName' 
not found for attribute key 'attribute2'"));
+  }
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in 
content when destination is set in content", "[EvaluateJsonPathTests]") {
+  controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::Destination, "flowfile-content");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute", "$.firstName");
+
+  bool warn_path_not_found_behavior = false;
+  SECTION("Ignore path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore");
+  }
+
+  SECTION("Skip path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "skip");
+  }
+
+  SECTION("Warn path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "warn");
+    warn_path_not_found_behavior = true;
+  }
+
+  auto result = controller_.trigger({{.content = "{}"}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).size() == 1);
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty());
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Unmatched).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == "{}");
+
+  std::string attribute_value;
+  CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value));
+
+  if (warn_path_not_found_behavior) {
+    CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' 
not found for attribute key 'attribute'"));
+  }
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON path query result does not 
match the required return type", "[EvaluateJsonPathTests]") {
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute", "$.name");
+
+  SECTION("Return type is set to scalar automatically when destination is set 
to flowfile-attribute") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::Destination, "flowfile-attribute");
+  }

Review Comment:
   Is this SECTION missing a pair, where the `Destination` is 
`flowfile-content` and the `Return Type` is `scalar`?  A SECTION by itself is 
just a block.



##########
extensions/standard-processors/tests/unit/EvaluateJsonPathTests.cpp:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "processors/EvaluateJsonPath.h"
+#include "unit/TestUtils.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class EvaluateJsonPathTestFixture {
+ public:
+  EvaluateJsonPathTestFixture() :
+      
controller_(std::make_unique<processors::EvaluateJsonPath>("EvaluateJsonPath")),
+      
evaluate_json_path_processor_(dynamic_cast<processors::EvaluateJsonPath*>(controller_.getProcessor()))
 {
+    REQUIRE(evaluate_json_path_processor_);
+    LogTestController::getInstance().setTrace<processors::EvaluateJsonPath>();
+  }
+
+ protected:
+  SingleProcessorTestController controller_;
+  processors::EvaluateJsonPath* evaluate_json_path_processor_;
+};
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "When destination is set to 
flowfile content only one dynamic property is allowed", 
"[EvaluateJsonPathTests]") {
+  controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::Destination, "flowfile-content");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute1", "value1");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute2", "value2");
+  REQUIRE_THROWS_WITH(controller_.trigger({{.content = "foo"}}), "Process 
Schedule Operation: Only one dynamic property is allowed for JSON path when 
destination is set to flowfile-content");
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Input flowfile has invalid JSON 
as content", "[EvaluateJsonPathTests]") {
+  ProcessorTriggerResult result;
+  std::string error_log;
+  SECTION("Flow file content is empty") {
+    result = controller_.trigger({{.content = ""}});
+    error_log = "FlowFile content is empty, transferring to Failure 
relationship";
+  }
+
+  SECTION("Flow file content is invalid json") {
+    result = controller_.trigger({{.content = "invalid json"}});
+    error_log = "FlowFile content is not a valid JSON document, transferring 
to Failure relationship";
+  }
+
+  CHECK(result.at(processors::EvaluateJsonPath::Matched).empty());
+  CHECK(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  CHECK(result.at(processors::EvaluateJsonPath::Failure).size() == 1);
+  CHECK(utils::verifyLogLinePresenceInPollTime(1s, error_log));
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Dynamic property contains 
invalid JSON path expression", "[EvaluateJsonPathTests]") {
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute", "1234");
+
+  auto result = controller_.trigger({{.content = "{}"}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1);
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Failure).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == "{}");
+  CHECK(utils::verifyLogLinePresenceInPollTime(0s, "Invalid JSON path 
expression '1234' found for attribute key 'attribute'"));
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in 
content when destination is set to attribute", "[EvaluateJsonPathTests]") {
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute1", "$.firstName");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute2", "$.lastName");
+
+  std::map<std::string, std::string> expected_attributes = {
+    {"attribute1", ""},
+    {"attribute2", ""}
+  };
+
+  bool warn_path_not_found_behavior = false;
+  bool expect_attributes = false;
+
+  SECTION("Ignore path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore");
+    expect_attributes = true;
+  }
+
+  SECTION("Skip path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "skip");
+  }
+
+  SECTION("Warn path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "warn");
+    warn_path_not_found_behavior = true;
+    expect_attributes = true;
+  }
+
+  auto result = controller_.trigger({{.content = "{}"}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1);
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty());
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Matched).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == "{}");
+
+  for (const auto& [key, value] : expected_attributes) {
+    std::string attribute_value;
+    if (!expect_attributes) {
+      CHECK_FALSE(result_flow_file->getAttribute(key, attribute_value));
+    } else {
+      CHECK(result_flow_file->getAttribute(key, attribute_value));
+      CHECK(attribute_value == value);
+    }
+  }
+
+  if (warn_path_not_found_behavior) {
+    CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' 
not found for attribute key 'attribute1'"));
+    CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.lastName' 
not found for attribute key 'attribute2'"));
+  }
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in 
content when destination is set in content", "[EvaluateJsonPathTests]") {
+  controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::Destination, "flowfile-content");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute", "$.firstName");
+
+  bool warn_path_not_found_behavior = false;
+  SECTION("Ignore path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore");
+  }
+
+  SECTION("Skip path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "skip");
+  }
+
+  SECTION("Warn path not found behavior") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::PathNotFoundBehavior, "warn");
+    warn_path_not_found_behavior = true;
+  }
+
+  auto result = controller_.trigger({{.content = "{}"}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).size() == 1);
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty());
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Unmatched).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == "{}");
+
+  std::string attribute_value;
+  CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value));
+
+  if (warn_path_not_found_behavior) {
+    CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' 
not found for attribute key 'attribute'"));
+  }
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON path query result does not 
match the required return type", "[EvaluateJsonPathTests]") {
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"attribute", "$.name");
+
+  SECTION("Return type is set to scalar automatically when destination is set 
to flowfile-attribute") {
+    controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::Destination, "flowfile-attribute");
+  }
+
+  std::string json_content = R"({"name": {"firstName": "John", "lastName": 
"Doe"}})";
+  auto result = controller_.trigger({{.content = json_content}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1);
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Failure).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == json_content);
+  std::string attribute_value;
+  CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value));
+  CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.name' 
returned a non-scalar value or multiple values for attribute key 'attribute', 
transferring to Failure relationship"));
+}
+
+TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query JSON object and write it 
to flow file", "[EvaluateJsonPathTests]") {
+  controller_.plan->setProperty(evaluate_json_path_processor_, 
processors::EvaluateJsonPath::Destination, "flowfile-content");
+  controller_.plan->setDynamicProperty(evaluate_json_path_processor_, 
"jsonPath", "$.name");
+
+  std::string json_content = R"({"name": {"firstName": "John", "lastName": 
"Doe"}})";
+  auto result = controller_.trigger({{.content = json_content}});
+
+  REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1);
+  REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty());
+  REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty());
+
+  const auto result_flow_file = 
result.at(processors::EvaluateJsonPath::Matched).at(0);
+
+  CHECK(controller_.plan->getContent(result_flow_file) == 
R"({"firstName":"John","lastName":"Doe"})");
+  std::string attribute_value;
+  CHECK_FALSE(result_flow_file->getAttribute("jsonPath", attribute_value));

Review Comment:
   we could use the overload which returns an optional:
   ```suggestion
     CHECK_FALSE(result_flow_file->getAttribute("jsonPath"));
   ```



##########
extensions/standard-processors/processors/EvaluateJsonPath.cpp:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "EvaluateJsonPath.h"
+
+#include <unordered_map>
+
+#include "core/ProcessSession.h"
+#include "core/ProcessContext.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+#include "jsoncons_ext/jsonpath/jsonpath.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace {
+bool isScalar(const jsoncons::json& value) {
+  return !value.is_array() && !value.is_object();
+}
+
+bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) {
+  return query_result.empty() || (query_result.size() == 1 && 
isScalar(query_result[0]));
+}
+}  // namespace
+
+void EvaluateJsonPath::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void EvaluateJsonPath::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  destination_ = 
utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, 
EvaluateJsonPath::Destination);
+  if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && 
getDynamicProperties().size() > 1) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is 
allowed for JSON path when destination is set to flowfile-content");
+  }
+  null_value_representation_ = 
utils::parseEnumProperty<evaluate_json_path::NullValueRepresentationOption>(context,
 EvaluateJsonPath::NullValueRepresentation);
+  path_not_found_behavior_ = 
utils::parseEnumProperty<evaluate_json_path::PathNotFoundBehaviorOption>(context,
 EvaluateJsonPath::PathNotFoundBehavior);
+  return_type_ = 
utils::parseEnumProperty<evaluate_json_path::ReturnTypeOption>(context, 
EvaluateJsonPath::ReturnType);
+  if (return_type_ == evaluate_json_path::ReturnTypeOption::AutoDetect) {
+    if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) {
+      return_type_ = evaluate_json_path::ReturnTypeOption::JSON;
+    } else {
+      return_type_ = evaluate_json_path::ReturnTypeOption::Scalar;
+    }
+  }
+}
+
+std::string EvaluateJsonPath::extractQueryResult(const jsoncons::json& 
query_result) const {
+  gsl_Expects(!query_result.empty());
+  if (query_result.size() > 1) {
+    gsl_Assert(return_type_ == evaluate_json_path::ReturnTypeOption::JSON);
+    return query_result.to_string();
+  }
+
+  if (query_result[0].is_null()) {
+    return null_value_representation_ == 
evaluate_json_path::NullValueRepresentationOption::EmptyString ? "" : "null";
+  }
+
+  if (query_result[0].is_string()) {
+    return query_result[0].as<std::string>();
+  }
+
+  return query_result[0].to_string();
+}
+
+void EvaluateJsonPath::writeQueryResult(core::ProcessSession& session, 
core::FlowFile& flow_file, const jsoncons::json& query_result, const 
std::string& property_name,
+    std::unordered_map<std::string, std::string>& attributes_to_set) const {
+  if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) {
+    session.write(flow_file, [&query_result, this](const 
std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
+      auto result_string = extractQueryResult(query_result);
+      return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const 
uint8_t*>(result_string.data()), result_string.size()));
+    });
+  } else {
+    attributes_to_set.emplace(property_name, extractQueryResult(query_result));
+  }
+}
+
+void EvaluateJsonPath::onTrigger(core::ProcessContext&, core::ProcessSession& 
session) {
+  auto flow_file = session.get();
+  if (!flow_file) {
+    return;

Review Comment:
   we could yield here



##########
extensions/standard-processors/processors/EvaluateJsonPath.h:
##########
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <string_view>
+#include <array>
+
+#include "core/logging/LoggerFactory.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "minifi-cpp/core/PropertyValidator.h"
+#include "core/RelationshipDefinition.h"
+
+#include "jsoncons/json.hpp"
+
+namespace org::apache::nifi::minifi::processors::evaluate_json_path {
+enum class DestinationType {
+  FlowFileContent,
+  FlowFileAttribute
+};
+
+enum class NullValueRepresentationOption {
+  EmptyString,
+  Null
+};
+
+enum class ReturnTypeOption {
+  AutoDetect,
+  JSON,
+  Scalar
+};
+
+enum class PathNotFoundBehaviorOption {
+  Warn,
+  Ignore,
+  Skip
+};
+}  // namespace org::apache::nifi::minifi::processors::evaluate_json_path
+
+namespace magic_enum::customize {
+using DestinationType = 
org::apache::nifi::minifi::processors::evaluate_json_path::DestinationType;
+using NullValueRepresentationOption = 
org::apache::nifi::minifi::processors::evaluate_json_path::NullValueRepresentationOption;
+using ReturnTypeOption = 
org::apache::nifi::minifi::processors::evaluate_json_path::ReturnTypeOption;
+using PathNotFoundBehaviorOption = 
org::apache::nifi::minifi::processors::evaluate_json_path::PathNotFoundBehaviorOption;
+
+template <>
+constexpr customize_t enum_name<DestinationType>(DestinationType value) 
noexcept {
+  switch (value) {
+    case DestinationType::FlowFileContent:
+      return "flowfile-content";
+    case DestinationType::FlowFileAttribute:
+      return "flowfile-attribute";
+  }
+  return invalid_tag;
+}
+
+template <>
+constexpr customize_t 
enum_name<NullValueRepresentationOption>(NullValueRepresentationOption value) 
noexcept {
+  switch (value) {
+    case NullValueRepresentationOption::EmptyString:
+      return "empty string";
+    case NullValueRepresentationOption::Null:
+      return "the string 'null'";
+  }
+  return invalid_tag;
+}
+
+template <>
+constexpr customize_t enum_name<ReturnTypeOption>(ReturnTypeOption value) 
noexcept {
+  switch (value) {
+    case ReturnTypeOption::AutoDetect:
+      return "auto-detect";
+    case ReturnTypeOption::JSON:
+      return "json";
+    case ReturnTypeOption::Scalar:
+      return "scalar";
+  }
+  return invalid_tag;
+}
+
+template <>
+constexpr customize_t 
enum_name<PathNotFoundBehaviorOption>(PathNotFoundBehaviorOption value) 
noexcept {
+  switch (value) {
+    case PathNotFoundBehaviorOption::Warn:
+      return "warn";
+    case PathNotFoundBehaviorOption::Ignore:
+      return "ignore";
+    case PathNotFoundBehaviorOption::Skip:
+      return "skip";
+  }
+  return invalid_tag;
+}
+}  // namespace magic_enum::customize
+
+namespace org::apache::nifi::minifi::processors {
+
+class EvaluateJsonPath final : public core::ProcessorImpl {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "Evaluates one or 
more JsonPath expressions against the content of a FlowFile. The results of 
those expressions are assigned to "
+        "FlowFile Attributes or are written to the content of the FlowFile 
itself, depending on configuration of the Processor. JsonPaths are entered by 
adding user-defined properties; "
+        "the name of the property maps to the Attribute Name into which the 
result will be placed (if the Destination is flowfile-attribute; otherwise, the 
property name is ignored). "
+        "The value of the property must be a valid JsonPath expression. A 
Return Type of 'auto-detect' will make a determination based off the configured 
destination. When 'Destination' is set to "
+        "'flowfile-attribute,' a return type of 'scalar' will be used. When 
'Destination' is set to 'flowfile-content,' a return type of 'JSON' will be 
used.If the JsonPath evaluates to a JSON "
+        "array or JSON object and the Return Type is set to 'scalar' the 
FlowFile will be unmodified and will be routed to failure. A Return Type of 
JSON can return scalar values if the provided "
+        "JsonPath evaluates to the specified value and will be routed as a 
match.If Destination is 'flowfile-content' and the JsonPath does not evaluate 
to a defined path, the FlowFile will be "
+        "routed to 'unmatched' without having its contents modified. If 
Destination is 'flowfile-attribute' and the expression matches nothing, 
attributes will be created with empty strings as the "
+        "value unless 'Path Not Found Behaviour' is set to 'skip', and the 
FlowFile will always be routed to 'matched.'";
+
+  EXTENSIONAPI static constexpr auto Destination = 
core::PropertyDefinitionBuilder<2>::createProperty("Destination")
+      .withDescription("Indicates whether the results of the JsonPath 
evaluation are written to the FlowFile content or a FlowFile attribute. If 
using attribute, must specify the Attribute Name "
+          "property. If set to flowfile-content, only one JsonPath may be 
specified, and the property name is ignored.")

Review Comment:
   You could add a `DynamicProperties` descriptor (see `RouteText` for an 
example), and then the second part (2nd and 3rd sentence, slightly modified) of 
this property description could be moved there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to