fgerlits commented on code in PR #1978: URL: https://github.com/apache/nifi-minifi-cpp/pull/1978#discussion_r2190261139
########## extensions/standard-processors/processors/EvaluateJsonPath.cpp: ########## @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "EvaluateJsonPath.h" + +#include <unordered_map> + +#include "core/ProcessSession.h" +#include "core/ProcessContext.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +#include "jsoncons_ext/jsonpath/jsonpath.hpp" + +namespace org::apache::nifi::minifi::processors { + +namespace { +bool isScalar(const jsoncons::json& value) { + return !value.is_array() && !value.is_object(); +} + +bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) { + return query_result.empty() || (query_result.size() == 1 && isScalar(query_result[0])); +} +} // namespace + +void EvaluateJsonPath::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void EvaluateJsonPath::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + destination_ = utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, EvaluateJsonPath::Destination); + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && getDynamicProperties().size() > 1) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is allowed for JSON path when destination is set to flowfile-content"); + } + null_value_representation_ = utils::parseEnumProperty<evaluate_json_path::NullValueRepresentationOption>(context, EvaluateJsonPath::NullValueRepresentation); + path_not_found_behavior_ = utils::parseEnumProperty<evaluate_json_path::PathNotFoundBehaviorOption>(context, EvaluateJsonPath::PathNotFoundBehavior); + return_type_ = utils::parseEnumProperty<evaluate_json_path::ReturnTypeOption>(context, EvaluateJsonPath::ReturnType); + if (return_type_ == evaluate_json_path::ReturnTypeOption::AutoDetect) { + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) { + return_type_ = evaluate_json_path::ReturnTypeOption::JSON; + } else { + return_type_ = evaluate_json_path::ReturnTypeOption::Scalar; + } + } +} + +std::string EvaluateJsonPath::extractQueryResult(const jsoncons::json& query_result) const { + gsl_Expects(!query_result.empty()); + if (query_result.size() > 1) { + gsl_Assert(return_type_ == evaluate_json_path::ReturnTypeOption::JSON); + return query_result.to_string(); + } + + if (query_result[0].is_null()) { + return null_value_representation_ == evaluate_json_path::NullValueRepresentationOption::EmptyString ? "" : "null"; + } + + if (query_result[0].is_string()) { + return query_result[0].as<std::string>(); + } + + return query_result[0].to_string(); +} + +void EvaluateJsonPath::writeQueryResult(core::ProcessSession& session, core::FlowFile& flow_file, const jsoncons::json& query_result, const std::string& property_name, + std::unordered_map<std::string, std::string>& attributes_to_set) const { + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) { + session.write(flow_file, [&query_result, this](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t { + auto result_string = extractQueryResult(query_result); + return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const uint8_t*>(result_string.data()), result_string.size())); + }); + } else { + attributes_to_set.emplace(property_name, extractQueryResult(query_result)); + } +} + +void EvaluateJsonPath::onTrigger(core::ProcessContext&, core::ProcessSession& session) { + auto flow_file = session.get(); + if (!flow_file) { + return; + } + + const auto flow_file_read_result = session.readBuffer(flow_file); + const auto json_string = std::string(reinterpret_cast<const char*>(flow_file_read_result.buffer.data()), flow_file_read_result.buffer.size()); Review Comment: we have a function for this: ```suggestion const auto json_string = to_string(session.readBuffer(flow_file)); ``` ########## extensions/standard-processors/processors/EvaluateJsonPath.cpp: ########## @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "EvaluateJsonPath.h" + +#include <unordered_map> + +#include "core/ProcessSession.h" +#include "core/ProcessContext.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +#include "jsoncons_ext/jsonpath/jsonpath.hpp" + +namespace org::apache::nifi::minifi::processors { + +namespace { +bool isScalar(const jsoncons::json& value) { + return !value.is_array() && !value.is_object(); +} + +bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) { + return query_result.empty() || (query_result.size() == 1 && isScalar(query_result[0])); +} +} // namespace + +void EvaluateJsonPath::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void EvaluateJsonPath::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + destination_ = utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, EvaluateJsonPath::Destination); + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && getDynamicProperties().size() > 1) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is allowed for JSON path when destination is set to flowfile-content"); + } Review Comment: We should also throw if there are no dynamic properties, as at least one is required for both destinations. ########## extensions/standard-processors/tests/unit/EvaluateJsonPathTests.cpp: ########## @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "processors/EvaluateJsonPath.h" +#include "unit/TestUtils.h" + +namespace org::apache::nifi::minifi::test { + +class EvaluateJsonPathTestFixture { + public: + EvaluateJsonPathTestFixture() : + controller_(std::make_unique<processors::EvaluateJsonPath>("EvaluateJsonPath")), + evaluate_json_path_processor_(dynamic_cast<processors::EvaluateJsonPath*>(controller_.getProcessor())) { + REQUIRE(evaluate_json_path_processor_); + LogTestController::getInstance().setTrace<processors::EvaluateJsonPath>(); + } + + protected: + SingleProcessorTestController controller_; + processors::EvaluateJsonPath* evaluate_json_path_processor_; +}; + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "When destination is set to flowfile content only one dynamic property is allowed", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute1", "value1"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute2", "value2"); + REQUIRE_THROWS_WITH(controller_.trigger({{.content = "foo"}}), "Process Schedule Operation: Only one dynamic property is allowed for JSON path when destination is set to flowfile-content"); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Input flowfile has invalid JSON as content", "[EvaluateJsonPathTests]") { + ProcessorTriggerResult result; + std::string error_log; + SECTION("Flow file content is empty") { + result = controller_.trigger({{.content = ""}}); + error_log = "FlowFile content is empty, transferring to Failure relationship"; + } + + SECTION("Flow file content is invalid json") { + result = controller_.trigger({{.content = "invalid json"}}); + error_log = "FlowFile content is not a valid JSON document, transferring to Failure relationship"; + } + + CHECK(result.at(processors::EvaluateJsonPath::Matched).empty()); + CHECK(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + CHECK(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + CHECK(utils::verifyLogLinePresenceInPollTime(1s, error_log)); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Dynamic property contains invalid JSON path expression", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "1234"); + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Failure).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "Invalid JSON path expression '1234' found for attribute key 'attribute'")); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in content when destination is set to attribute", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute1", "$.firstName"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute2", "$.lastName"); + + std::map<std::string, std::string> expected_attributes = { + {"attribute1", ""}, + {"attribute2", ""} + }; + + bool warn_path_not_found_behavior = false; + bool expect_attributes = false; + + SECTION("Ignore path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore"); + expect_attributes = true; + } + + SECTION("Skip path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "skip"); + } + + SECTION("Warn path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "warn"); + warn_path_not_found_behavior = true; + expect_attributes = true; + } + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + + for (const auto& [key, value] : expected_attributes) { + std::string attribute_value; + if (!expect_attributes) { + CHECK_FALSE(result_flow_file->getAttribute(key, attribute_value)); + } else { + CHECK(result_flow_file->getAttribute(key, attribute_value)); + CHECK(attribute_value == value); + } + } + + if (warn_path_not_found_behavior) { + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' not found for attribute key 'attribute1'")); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.lastName' not found for attribute key 'attribute2'")); + } +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in content when destination is set in content", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "$.firstName"); + + bool warn_path_not_found_behavior = false; + SECTION("Ignore path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore"); + } + + SECTION("Skip path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "skip"); + } + + SECTION("Warn path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "warn"); + warn_path_not_found_behavior = true; + } + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Unmatched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value)); + + if (warn_path_not_found_behavior) { + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' not found for attribute key 'attribute'")); + } +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON path query result does not match the required return type", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "$.name"); + + SECTION("Return type is set to scalar automatically when destination is set to flowfile-attribute") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-attribute"); + } Review Comment: Is this SECTION missing a pair, where the `Destination` is `flowfile-content` and the `Return Type` is `scalar`? A SECTION by itself is just a block. ########## extensions/standard-processors/tests/unit/EvaluateJsonPathTests.cpp: ########## @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "processors/EvaluateJsonPath.h" +#include "unit/TestUtils.h" + +namespace org::apache::nifi::minifi::test { + +class EvaluateJsonPathTestFixture { + public: + EvaluateJsonPathTestFixture() : + controller_(std::make_unique<processors::EvaluateJsonPath>("EvaluateJsonPath")), + evaluate_json_path_processor_(dynamic_cast<processors::EvaluateJsonPath*>(controller_.getProcessor())) { + REQUIRE(evaluate_json_path_processor_); + LogTestController::getInstance().setTrace<processors::EvaluateJsonPath>(); + } + + protected: + SingleProcessorTestController controller_; + processors::EvaluateJsonPath* evaluate_json_path_processor_; +}; + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "When destination is set to flowfile content only one dynamic property is allowed", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute1", "value1"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute2", "value2"); + REQUIRE_THROWS_WITH(controller_.trigger({{.content = "foo"}}), "Process Schedule Operation: Only one dynamic property is allowed for JSON path when destination is set to flowfile-content"); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Input flowfile has invalid JSON as content", "[EvaluateJsonPathTests]") { + ProcessorTriggerResult result; + std::string error_log; + SECTION("Flow file content is empty") { + result = controller_.trigger({{.content = ""}}); + error_log = "FlowFile content is empty, transferring to Failure relationship"; + } + + SECTION("Flow file content is invalid json") { + result = controller_.trigger({{.content = "invalid json"}}); + error_log = "FlowFile content is not a valid JSON document, transferring to Failure relationship"; + } + + CHECK(result.at(processors::EvaluateJsonPath::Matched).empty()); + CHECK(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + CHECK(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + CHECK(utils::verifyLogLinePresenceInPollTime(1s, error_log)); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Dynamic property contains invalid JSON path expression", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "1234"); + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Failure).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "Invalid JSON path expression '1234' found for attribute key 'attribute'")); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in content when destination is set to attribute", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute1", "$.firstName"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute2", "$.lastName"); + + std::map<std::string, std::string> expected_attributes = { + {"attribute1", ""}, + {"attribute2", ""} + }; + + bool warn_path_not_found_behavior = false; + bool expect_attributes = false; + + SECTION("Ignore path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore"); + expect_attributes = true; + } + + SECTION("Skip path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "skip"); + } + + SECTION("Warn path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "warn"); + warn_path_not_found_behavior = true; + expect_attributes = true; + } + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + + for (const auto& [key, value] : expected_attributes) { + std::string attribute_value; + if (!expect_attributes) { + CHECK_FALSE(result_flow_file->getAttribute(key, attribute_value)); + } else { + CHECK(result_flow_file->getAttribute(key, attribute_value)); + CHECK(attribute_value == value); + } + } + + if (warn_path_not_found_behavior) { + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' not found for attribute key 'attribute1'")); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.lastName' not found for attribute key 'attribute2'")); + } +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in content when destination is set in content", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "$.firstName"); + + bool warn_path_not_found_behavior = false; + SECTION("Ignore path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore"); + } + + SECTION("Skip path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "skip"); + } + + SECTION("Warn path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "warn"); + warn_path_not_found_behavior = true; + } + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Unmatched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value)); + + if (warn_path_not_found_behavior) { + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' not found for attribute key 'attribute'")); + } +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON path query result does not match the required return type", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "$.name"); + + SECTION("Return type is set to scalar automatically when destination is set to flowfile-attribute") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-attribute"); + } + + std::string json_content = R"({"name": {"firstName": "John", "lastName": "Doe"}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Failure).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == json_content); + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value)); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.name' returned a non-scalar value or multiple values for attribute key 'attribute', transferring to Failure relationship")); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query JSON object and write it to flow file", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "jsonPath", "$.name"); + + std::string json_content = R"({"name": {"firstName": "John", "lastName": "Doe"}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == R"({"firstName":"John","lastName":"Doe"})"); + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("jsonPath", attribute_value)); Review Comment: we could use the overload which returns an optional: ```suggestion CHECK_FALSE(result_flow_file->getAttribute("jsonPath")); ``` ########## extensions/standard-processors/processors/EvaluateJsonPath.cpp: ########## @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "EvaluateJsonPath.h" + +#include <unordered_map> + +#include "core/ProcessSession.h" +#include "core/ProcessContext.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +#include "jsoncons_ext/jsonpath/jsonpath.hpp" + +namespace org::apache::nifi::minifi::processors { + +namespace { +bool isScalar(const jsoncons::json& value) { + return !value.is_array() && !value.is_object(); +} + +bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) { + return query_result.empty() || (query_result.size() == 1 && isScalar(query_result[0])); +} +} // namespace + +void EvaluateJsonPath::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void EvaluateJsonPath::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + destination_ = utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, EvaluateJsonPath::Destination); + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && getDynamicProperties().size() > 1) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is allowed for JSON path when destination is set to flowfile-content"); + } + null_value_representation_ = utils::parseEnumProperty<evaluate_json_path::NullValueRepresentationOption>(context, EvaluateJsonPath::NullValueRepresentation); + path_not_found_behavior_ = utils::parseEnumProperty<evaluate_json_path::PathNotFoundBehaviorOption>(context, EvaluateJsonPath::PathNotFoundBehavior); + return_type_ = utils::parseEnumProperty<evaluate_json_path::ReturnTypeOption>(context, EvaluateJsonPath::ReturnType); + if (return_type_ == evaluate_json_path::ReturnTypeOption::AutoDetect) { + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) { + return_type_ = evaluate_json_path::ReturnTypeOption::JSON; + } else { + return_type_ = evaluate_json_path::ReturnTypeOption::Scalar; + } + } +} + +std::string EvaluateJsonPath::extractQueryResult(const jsoncons::json& query_result) const { + gsl_Expects(!query_result.empty()); + if (query_result.size() > 1) { + gsl_Assert(return_type_ == evaluate_json_path::ReturnTypeOption::JSON); + return query_result.to_string(); + } + + if (query_result[0].is_null()) { + return null_value_representation_ == evaluate_json_path::NullValueRepresentationOption::EmptyString ? "" : "null"; + } + + if (query_result[0].is_string()) { + return query_result[0].as<std::string>(); + } + + return query_result[0].to_string(); +} + +void EvaluateJsonPath::writeQueryResult(core::ProcessSession& session, core::FlowFile& flow_file, const jsoncons::json& query_result, const std::string& property_name, + std::unordered_map<std::string, std::string>& attributes_to_set) const { + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) { + session.write(flow_file, [&query_result, this](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t { + auto result_string = extractQueryResult(query_result); + return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const uint8_t*>(result_string.data()), result_string.size())); + }); + } else { + attributes_to_set.emplace(property_name, extractQueryResult(query_result)); + } +} + +void EvaluateJsonPath::onTrigger(core::ProcessContext&, core::ProcessSession& session) { + auto flow_file = session.get(); + if (!flow_file) { + return; Review Comment: we could yield here ########## extensions/standard-processors/processors/EvaluateJsonPath.h: ########## @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <string_view> +#include <array> + +#include "core/logging/LoggerFactory.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "minifi-cpp/core/PropertyValidator.h" +#include "core/RelationshipDefinition.h" + +#include "jsoncons/json.hpp" + +namespace org::apache::nifi::minifi::processors::evaluate_json_path { +enum class DestinationType { + FlowFileContent, + FlowFileAttribute +}; + +enum class NullValueRepresentationOption { + EmptyString, + Null +}; + +enum class ReturnTypeOption { + AutoDetect, + JSON, + Scalar +}; + +enum class PathNotFoundBehaviorOption { + Warn, + Ignore, + Skip +}; +} // namespace org::apache::nifi::minifi::processors::evaluate_json_path + +namespace magic_enum::customize { +using DestinationType = org::apache::nifi::minifi::processors::evaluate_json_path::DestinationType; +using NullValueRepresentationOption = org::apache::nifi::minifi::processors::evaluate_json_path::NullValueRepresentationOption; +using ReturnTypeOption = org::apache::nifi::minifi::processors::evaluate_json_path::ReturnTypeOption; +using PathNotFoundBehaviorOption = org::apache::nifi::minifi::processors::evaluate_json_path::PathNotFoundBehaviorOption; + +template <> +constexpr customize_t enum_name<DestinationType>(DestinationType value) noexcept { + switch (value) { + case DestinationType::FlowFileContent: + return "flowfile-content"; + case DestinationType::FlowFileAttribute: + return "flowfile-attribute"; + } + return invalid_tag; +} + +template <> +constexpr customize_t enum_name<NullValueRepresentationOption>(NullValueRepresentationOption value) noexcept { + switch (value) { + case NullValueRepresentationOption::EmptyString: + return "empty string"; + case NullValueRepresentationOption::Null: + return "the string 'null'"; + } + return invalid_tag; +} + +template <> +constexpr customize_t enum_name<ReturnTypeOption>(ReturnTypeOption value) noexcept { + switch (value) { + case ReturnTypeOption::AutoDetect: + return "auto-detect"; + case ReturnTypeOption::JSON: + return "json"; + case ReturnTypeOption::Scalar: + return "scalar"; + } + return invalid_tag; +} + +template <> +constexpr customize_t enum_name<PathNotFoundBehaviorOption>(PathNotFoundBehaviorOption value) noexcept { + switch (value) { + case PathNotFoundBehaviorOption::Warn: + return "warn"; + case PathNotFoundBehaviorOption::Ignore: + return "ignore"; + case PathNotFoundBehaviorOption::Skip: + return "skip"; + } + return invalid_tag; +} +} // namespace magic_enum::customize + +namespace org::apache::nifi::minifi::processors { + +class EvaluateJsonPath final : public core::ProcessorImpl { + public: + EXTENSIONAPI static constexpr const char* Description = "Evaluates one or more JsonPath expressions against the content of a FlowFile. The results of those expressions are assigned to " + "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the Processor. JsonPaths are entered by adding user-defined properties; " + "the name of the property maps to the Attribute Name into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). " + "The value of the property must be a valid JsonPath expression. A Return Type of 'auto-detect' will make a determination based off the configured destination. When 'Destination' is set to " + "'flowfile-attribute,' a return type of 'scalar' will be used. When 'Destination' is set to 'flowfile-content,' a return type of 'JSON' will be used.If the JsonPath evaluates to a JSON " + "array or JSON object and the Return Type is set to 'scalar' the FlowFile will be unmodified and will be routed to failure. A Return Type of JSON can return scalar values if the provided " + "JsonPath evaluates to the specified value and will be routed as a match.If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be " + "routed to 'unmatched' without having its contents modified. If Destination is 'flowfile-attribute' and the expression matches nothing, attributes will be created with empty strings as the " + "value unless 'Path Not Found Behaviour' is set to 'skip', and the FlowFile will always be routed to 'matched.'"; + + EXTENSIONAPI static constexpr auto Destination = core::PropertyDefinitionBuilder<2>::createProperty("Destination") + .withDescription("Indicates whether the results of the JsonPath evaluation are written to the FlowFile content or a FlowFile attribute. If using attribute, must specify the Attribute Name " + "property. If set to flowfile-content, only one JsonPath may be specified, and the property name is ignored.") Review Comment: You could add a `DynamicProperties` descriptor (see `RouteText` for an example), and then the second part (2nd and 3rd sentence, slightly modified) of this property description could be moved there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
