This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ec59578106b9d9adcdc4d4ea2223d3531eac9cbc Author: Eyizoha <[email protected]> AuthorDate: Mon Feb 19 17:35:46 2024 +0800 IMPALA-12786: Optimize count(*) for JSON scans When performing zero slots scans on a JSON table for operations like count(*), we don't require specific data from the JSON, we only need the number of top-level JSON objects. However, the current JSON parser based on rapidjson still decodes and copies specific data from the JSON, even in zero slots scans. Skipping these steps can significantly improve scan performance. This patch introduces a JSON skipper to conduct zero slots scans on JSON data. Essentially, it is a simplified version of a rapidjson parser, removing specific data decoding and copying operations, resulting in faster parsing of the number of JSON objects. The skipper retains the ability to recognize malformed JSON and provide specific error codes same as the rapidjson parser. Nevertheless, as it bypasses specific data parsing, it cannot identify string encoding errors or numeric overflow errors. Despite this, these data errors do not impact the counting of JSON objects, so it is acceptable to ignore them. The TEXT scanner exhibits similar behavior. Additionally, a new query option, disable_optimized_json_count_star, has been added to disable this optimization and revert to the old behavior. In the performance test of TPC-DS with a format of json/none and a scale of 10GB, the performance optimization is shown in the following tables: +-----------+---------------------------+--------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+--------+ | Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval | +-----------+---------------------------+--------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+--------+ | TPCDS(10) | TPCDS-Q_COUNT_UNOPTIMIZED | json / none / none | 6.78 | 6.88 | -1.46% | 4.93% | 3.63% | 9 | -1.51% | -0.74 | -0.72 | | TPCDS(10) | TPCDS-Q_COUNT_ZERO_SLOT | json / none / none | 2.42 | 6.75 | I -64.20% | 6.44% | 4.58% | 9 | I -177.75% | -3.36 | -37.55 | | TPCDS(10) | TPCDS-Q_COUNT_OPTIMIZED | json / none / none | 2.42 | 7.03 | I -65.63% | 3.93% | 4.39% | 9 | I -194.13% | -3.36 | -42.82 | +-----------+---------------------------+--------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+--------+ (I) Improvement: TPCDS(10) TPCDS-Q_COUNT_ZERO_SLOT [json / none / none] (6.75s -> 2.42s [-64.20%]) +--------------+------------+---------+----------+------------+------------+----------+----------+------------+--------+-------+--------+-----------+ | Operator | % of Query | Avg | Base Avg | Delta(Avg) | StdDev(%) | Max | Base Max | Delta(Max) | #Hosts | #Inst | #Rows | Est #Rows | +--------------+------------+---------+----------+------------+------------+----------+----------+------------+--------+-------+--------+-----------+ | 01:AGGREGATE | 2.58% | 54.85ms | 58.88ms | -6.85% | * 14.43% * | 115.82ms | 133.11ms | -12.99% | 3 | 3 | 3 | 1 | | 00:SCAN HDFS | 97.41% | 2.07s | 6.07s | -65.84% | 5.87% | 2.43s | 6.95s | -65.01% | 3 | 3 | 28.80M | 143.83M | +--------------+------------+---------+----------+------------+------------+----------+----------+------------+--------+-------+--------+-----------+ (I) Improvement: TPCDS(10) TPCDS-Q_COUNT_OPTIMIZED [json / none / none] (7.03s -> 2.42s [-65.63%]) +--------------+------------+-------+----------+------------+-----------+-------+----------+------------+--------+-------+--------+-----------+ | Operator | % of Query | Avg | Base Avg | Delta(Avg) | StdDev(%) | Max | Base Max | Delta(Max) | #Hosts | #Inst | #Rows | Est #Rows | +--------------+------------+-------+----------+------------+-----------+-------+----------+------------+--------+-------+--------+-----------+ | 00:SCAN HDFS | 99.35% | 2.07s | 6.49s | -68.15% | 4.83% | 2.37s | 7.49s | -68.32% | 3 | 3 | 28.80M | 143.83M | +--------------+------------+-------+----------+------------+-----------+-------+----------+------------+--------+-------+--------+-----------+ Testing: - Added new test cases in TestQueriesJsonTables to verify that query results are consistent before and after optimization. - Passed existing JSON scanning-related tests. Change-Id: I97ff097661c3c577aeafeeb1518408ce7a8a255e Reviewed-on: http://gerrit.cloudera.org:8080/21039 Reviewed-by: Quanlong Huang <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/json/hdfs-json-scanner.cc | 14 +- be/src/exec/json/json-parser-test.cc | 151 ++++++++++++++++- be/src/exec/json/json-parser.cc | 186 +++++++++++++++++++++ be/src/exec/json/json-parser.h | 99 ++++++++++- be/src/service/query-options.cc | 4 + be/src/service/query-options.h | 4 +- common/thrift/ImpalaService.thrift | 3 + common/thrift/Query.thrift | 4 + .../queries/QueryTest/complex_json.test | 7 + .../queries/QueryTest/malformed_json.test | 7 + .../queries/QueryTest/multiline_json.test | 7 + .../queries/QueryTest/overflow_json.test | 7 + tests/query_test/test_queries.py | 4 +- 13 files changed, 485 insertions(+), 12 deletions(-) diff --git a/be/src/exec/json/hdfs-json-scanner.cc b/be/src/exec/json/hdfs-json-scanner.cc index 7644a8671..46d26cc7e 100644 --- a/be/src/exec/json/hdfs-json-scanner.cc +++ b/be/src/exec/json/hdfs-json-scanner.cc @@ -119,7 +119,6 @@ Status HdfsJsonScanner::InitNewRange() { } RETURN_IF_ERROR(UpdateDecompressor(compression_type)); - // TODO: Optmize for zero slots scan (e.g. count(*)). vector<string> schema; schema.reserve(scan_node_->materialized_slots().size()); for (const SlotDescriptor* slot : scan_node_->materialized_slots()) { @@ -203,8 +202,17 @@ Status HdfsJsonScanner::FindFirstTuple() { Status HdfsJsonScanner::ParseWrapper(int max_tuples, int* num_tuples) { DCHECK(json_parser_->IsTidy()); SCOPED_TIMER(parse_json_timer_); - Status status = json_parser_->Parse(max_tuples, num_tuples); - RETURN_IF_ERROR(buffer_status_); + Status status; + if (!state_->query_options().disable_optimized_json_count_star && + scan_node_->materialized_slots().size() == 0) { + status = json_parser_->CountJsonObjects(max_tuples, num_tuples); + RETURN_IF_ERROR(buffer_status_); + DCHECK(num_tuples_materialized_ == 0); + num_tuples_materialized_ = WriteTemplateTuples(tuple_row_, *num_tuples); + } else { + Status status = json_parser_->Parse(max_tuples, num_tuples); + RETURN_IF_ERROR(buffer_status_); + } return status; } diff --git a/be/src/exec/json/json-parser-test.cc b/be/src/exec/json/json-parser-test.cc index e2693111c..b19cd15a9 100644 --- a/be/src/exec/json/json-parser-test.cc +++ b/be/src/exec/json/json-parser-test.cc @@ -22,8 +22,10 @@ #include "exec/json/json-parser.h" #include "testutil/gtest-util.h" +#include "util/time.h" using namespace std; +using namespace rapidjson; namespace impala { @@ -43,13 +45,23 @@ class JsonParserTest : public ::testing::TestWithParam<int> { virtual void SetUp() override { data_pos_ = 0; + repeats_ = 0; stream_size_ = GetParam(); } + void Reset(size_t repeats = 0) { + data_pos_ = 0; + repeats_ = repeats; + } + void NextBuffer(const char** begin, const char** end) { EXPECT_EQ(*begin, *end); *begin = *end = nullptr; - if (data_pos_ > data_.size()) return; + if (data_pos_ >= data_.size()) { + if (repeats_ == 0) return; + data_pos_ = 0; + --repeats_; + } *begin = data_.data() + data_pos_; size_t len = min(stream_size_, data_.size() - data_pos_); *end = *begin + len; @@ -60,7 +72,42 @@ class JsonParserTest : public ::testing::TestWithParam<int> { const string& result() const { return result_; } + enum JsonValueType { + TYPE_NULL, + TYPE_TRUE, + TYPE_FALSE, + TYPE_STRING, + TYPE_NUMBER, + TYPE_OBJECT, + TYPE_ARRAY, + TYPE_VALUE + }; + + void TestSkip(const char* v, JsonValueType t, ParseErrorCode e = kParseErrorNone) { + SimpleStream ss(v); + JsonSkipper<SimpleStream> js(ss); + bool res; + switch (t) { + case TYPE_NULL: res = js.SkipNull(); break; + case TYPE_TRUE: res = js.SkipTrue(); break; + case TYPE_FALSE: res = js.SkipFalse(); break; + case TYPE_STRING: res = js.SkipString(); break; + case TYPE_NUMBER: res = js.SkipNumber(); break; + case TYPE_OBJECT: res = js.SkipObject(); break; + case TYPE_ARRAY: res = js.SkipArray(); break; + case TYPE_VALUE: res = js.SkipValue(); break; + default: ASSERT_TRUE(false); + } + if (e == kParseErrorNone) { + EXPECT_TRUE(res) << v; + } else { + EXPECT_FALSE(res) << v; + EXPECT_EQ(js.GetErrorCode(), e) << v; + } + } + private: + size_t repeats_ = 0; size_t data_pos_ = 0; size_t stream_size_; string data_; @@ -109,7 +156,7 @@ class JsonParserTest : public ::testing::TestWithParam<int> { INSTANTIATE_TEST_SUITE_P(StreamSize, JsonParserTest, ::testing::Values(1, 16, 256)); -TEST_P(JsonParserTest, Basic) { +TEST_P(JsonParserTest, BasicTest) { SimpleJsonScanner js(schema(), [this](const char** begin, const char** end) { this->NextBuffer(begin, end); }); @@ -120,7 +167,105 @@ TEST_P(JsonParserTest, Basic) { EXPECT_GE(num_rows, 0); EXPECT_LE(num_rows, max_rows); } while (num_rows); - EXPECT_EQ(result(), js.Result()); + EXPECT_EQ(result(), js.result()); +} + +TEST_P(JsonParserTest, JsonSkipperTest) { + // positive cases + TestSkip("null", TYPE_NULL); + TestSkip("true", TYPE_TRUE); + TestSkip("false", TYPE_FALSE); + + TestSkip(R"("abc")", TYPE_STRING); + TestSkip(R"(" \n\t\r")", TYPE_STRING); + TestSkip(R"("\0\1\2")", TYPE_STRING); + TestSkip(R"("\u123\"\'\\")", TYPE_STRING); + TestSkip(R"("你好🙂")", TYPE_STRING); + TestSkip(R"("\u009f\u0099\u0082")", TYPE_STRING); + + TestSkip("1.024", TYPE_NUMBER); + TestSkip("-9.9", TYPE_NUMBER); + TestSkip("2e10", TYPE_NUMBER); + TestSkip("-2e-10", TYPE_NUMBER); + + TestSkip("{}", TYPE_OBJECT); + TestSkip(R"({"a":null, "b":[1,true,false]})", TYPE_OBJECT); + TestSkip(R"({"a":null, "b":{"c":"d"}})", TYPE_OBJECT); + TestSkip(R"({"a":null, "b":[{"k1":"v1"}, {"k2":"v2"}]})", TYPE_OBJECT); + + TestSkip("[]", TYPE_ARRAY); + TestSkip(R"(["",true,false])", TYPE_ARRAY); + TestSkip(R"(["]",{"":[{},[{}]]}])", TYPE_ARRAY); + TestSkip(R"(["",{},[[[]]],{"a":[1,2],"":""}])", TYPE_ARRAY); + + TestSkip("null", TYPE_VALUE); + TestSkip(R"("abc")", TYPE_VALUE); + TestSkip("1.024", TYPE_VALUE); + TestSkip("{}", TYPE_VALUE); + TestSkip("[]", TYPE_VALUE); + + // negative cases + TestSkip("nuLL", TYPE_NULL, kParseErrorValueInvalid); + TestSkip("tRue", TYPE_TRUE, kParseErrorValueInvalid); + TestSkip("flase", TYPE_FALSE, kParseErrorValueInvalid); + + TestSkip(R"("abc\")", TYPE_STRING, kParseErrorStringMissQuotationMark); + TestSkip(R"("你好🙂\")", TYPE_STRING, kParseErrorStringMissQuotationMark); + TestSkip(R"("\u009f\u0099\u00\")", TYPE_STRING, kParseErrorStringMissQuotationMark); + + TestSkip("Inf", TYPE_NUMBER, kParseErrorValueInvalid); + TestSkip("-Infinity", TYPE_NUMBER, kParseErrorValueInvalid); + TestSkip("NaN", TYPE_NUMBER, kParseErrorValueInvalid); + TestSkip("+1", TYPE_NUMBER, kParseErrorValueInvalid); + TestSkip(".123", TYPE_NUMBER, kParseErrorValueInvalid); + TestSkip("1.", TYPE_NUMBER, kParseErrorNumberMissFraction); + TestSkip("2e", TYPE_NUMBER, kParseErrorNumberMissExponent); + + TestSkip("{1}", TYPE_OBJECT, kParseErrorObjectMissName); + TestSkip(R"({"a""b"})", TYPE_OBJECT, kParseErrorObjectMissColon); + TestSkip(R"({"a":})", TYPE_OBJECT, kParseErrorValueInvalid); + TestSkip(R"({"a":"b")", TYPE_OBJECT, kParseErrorObjectMissCommaOrCurlyBracket); + TestSkip(R"({"a":null, "b":{1,true,false}})", TYPE_OBJECT, kParseErrorObjectMissName); + + TestSkip("[,false]", TYPE_ARRAY, kParseErrorValueInvalid); + TestSkip("[true,]", TYPE_ARRAY, kParseErrorValueInvalid); + TestSkip("[true,false", TYPE_ARRAY, kParseErrorArrayMissCommaOrSquareBracket); + TestSkip("[[1,2]", TYPE_ARRAY, kParseErrorArrayMissCommaOrSquareBracket); + TestSkip(R"(["],"a","b"])", TYPE_ARRAY, kParseErrorArrayMissCommaOrSquareBracket); + + TestSkip("Null", TYPE_VALUE, kParseErrorValueInvalid); + TestSkip(R"({"abc\":1})", TYPE_VALUE, kParseErrorStringMissQuotationMark); + TestSkip("-2.e4", TYPE_VALUE, kParseErrorNumberMissFraction); + TestSkip(R"({"a":b})", TYPE_VALUE, kParseErrorValueInvalid); + TestSkip("[,]", TYPE_VALUE, kParseErrorValueInvalid); +} + +TEST_P(JsonParserTest, CountJsonObjectsTest) { + SimpleJsonScanner js({}, [this](const char** begin, const char** end) { + this->NextBuffer(begin, end); + }); + constexpr int max_rows = 1024; + int num_rows = 0, row_count = 0;; + + int64_t scan_start_time = UnixMicros(); + Reset(1000); + do { + EXPECT_OK(js.Scan(max_rows, &num_rows)); + } while (num_rows); + + int64_t count_start_time = UnixMicros(); + Reset(1000); + do { + EXPECT_OK(js.Count(max_rows, &num_rows)); + row_count += num_rows; + } while (num_rows); + int64_t end_time = UnixMicros(); + + EXPECT_EQ(row_count, js.row_count()); + LOG(INFO) << "JSON Scan cost time in ms: " + << static_cast<double>(count_start_time - scan_start_time) / 1000 + << ", JSON Count cost time in ms: " + << static_cast<double>(end_time - count_start_time) / 1000; } } diff --git a/be/src/exec/json/json-parser.cc b/be/src/exec/json/json-parser.cc index 423dcd094..4fbfb3d45 100644 --- a/be/src/exec/json/json-parser.cc +++ b/be/src/exec/json/json-parser.cc @@ -28,6 +28,11 @@ using std::string; #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return false +#define ERROR_IF_FALSE(x, err) \ + do { \ + if (UNLIKELY(!(x))) { code_ = err; return false; } \ + } while (false) + template <class Scanner> JsonParser<Scanner>::JsonParser(const vector<string>& schema, Scanner* scanner) : num_fields_(schema.size()), scanner_(scanner), stream_(this) { @@ -82,6 +87,7 @@ bool JsonParser<Scanner>::MoveToNextJson() { template <class Scanner> Status JsonParser<Scanner>::Parse(int max_rows, int* num_rows) { while (*num_rows < max_rows) { + // TODO: Support Inf and NaN. constexpr auto parse_flags = kParseNumbersAsStringsFlag | kParseStopWhenDoneFlag; // Reads characters from the stream, parses them and publishes events to this // handler (JsonParser). @@ -129,6 +135,37 @@ Status JsonParser<Scanner>::Parse(int max_rows, int* num_rows) { return Status::OK(); } +template <class Scanner> +Status JsonParser<Scanner>::CountJsonObjects(int max_rows, int* num_rows) { + JsonSkipper<CharStream> skipper(stream_); + while (*num_rows < max_rows) { + skipper.SkipNextObject(); + + if (UNLIKELY(skipper.HasError())) { + if (skipper.GetErrorCode() == kParseErrorDocumentEmpty) { + // See the comments at the corresponding location of the Parse(). + if (UNLIKELY(!stream_.Eos())) { + DCHECK_EQ(stream_.Peek(), '\0'); + stream_.Take(); + continue; + } + return Status::OK(); + } + RETURN_IF_ERROR(scanner_->HandleError(skipper.GetErrorCode(), stream_.Tell())); + + // See the comments at the corresponding location of the Parse(). + if (reader_.GetParseErrorCode() != kParseErrorObjectMissCommaOrCurlyBracket) { + MoveToNextJson(); + } + } + + ++(*num_rows); + if (UNLIKELY(scanner_->BreakParse())) break; + } + + return Status::OK(); +} + template <class Scanner> bool JsonParser<Scanner>::Key(const char* str, uint32_t len, bool copy) { if (object_depth_ == 1 && array_depth_ == 0) { @@ -230,5 +267,154 @@ bool JsonParser<Scanner>::String(const char* str, uint32_t len, bool copy) { return true; } +template<class Stream> +bool JsonSkipper<Stream>::SkipNextObject() { + code_ = kParseErrorNone; + while (true) { + SkipWhitespace(); + ERROR_IF_FALSE(s_.Peek() != '\0', kParseErrorDocumentEmpty); + bool is_object = (s_.Peek() == '{'); + RETURN_IF_FALSE(SkipValue()); + if (LIKELY(is_object)) return true; + } +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipNull() { + DCHECK(s_.Peek() == 'n'); + s_.Take(); + ERROR_IF_FALSE(Consume('u'), kParseErrorValueInvalid); + ERROR_IF_FALSE(Consume('l'), kParseErrorValueInvalid); + ERROR_IF_FALSE(Consume('l'), kParseErrorValueInvalid); + return true; +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipTrue() { + DCHECK(s_.Peek() == 't'); + s_.Take(); + ERROR_IF_FALSE(Consume('r'), kParseErrorValueInvalid); + ERROR_IF_FALSE(Consume('u'), kParseErrorValueInvalid); + ERROR_IF_FALSE(Consume('e'), kParseErrorValueInvalid); + return true; +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipFalse() { + DCHECK(s_.Peek() == 'f'); + s_.Take(); + ERROR_IF_FALSE(Consume('a'), kParseErrorValueInvalid); + ERROR_IF_FALSE(Consume('l'), kParseErrorValueInvalid); + ERROR_IF_FALSE(Consume('s'), kParseErrorValueInvalid); + ERROR_IF_FALSE(Consume('e'), kParseErrorValueInvalid); + return true; +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipString() { + DCHECK(s_.Peek() == '"'); + s_.Take(); + char c; + bool escape = false; + while ((c = s_.Peek()) != '\0') { + if (escape) { + escape = false; + } else if (c == '\\') { + escape = true; + } else if (c == '"') { + s_.Take(); + return true; + } + s_.Take(); + } + ERROR_IF_FALSE(false, kParseErrorStringMissQuotationMark); +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipNumber() { + // Please note that in standard JSON, number literals must start with a digit or a + // minus sign (in the case of negative numbers). Positive numbers should be written + // directly without a '+', and '0.123' should not be abbreviated as '.123'. + // Numbers starting with '.' or '+' in JSON are considered invalid values, which is + // consistent with the behavior of rapidjson. + // Despite the fact that special values such as Inf and NaN are not supported in + // standard JSON (they are considered invalid values), rapidjson does support them. + // However, it requires the parsing flag kParseNanAndInfFlag to be enabled. Since this + // flag is not enabled in the current JsonParser::Parse(), this function also remains + // consistent by not supporting Inf and NaN. + // TODO: Support Inf and NaN. + Consume('-'); + if (UNLIKELY(s_.Peek() == '0')) { + s_.Take(); + } else if (LIKELY(s_.Peek() >= '1' && s_.Peek() <= '9')) { + while (LIKELY(s_.Peek() >= '0' && s_.Peek() <= '9')) s_.Take(); + } else ERROR_IF_FALSE(false, kParseErrorValueInvalid); + + if (Consume('.')) { + ERROR_IF_FALSE(s_.Peek() >= '0' && s_.Peek() <= '9', kParseErrorNumberMissFraction); + while (LIKELY(s_.Peek() >= '0' && s_.Peek() <= '9')) s_.Take(); + } + + if (Consume('e') || Consume('E')) { + if (!Consume('+')) Consume('-'); + ERROR_IF_FALSE(s_.Peek() >= '0' && s_.Peek() <= '9', kParseErrorNumberMissExponent); + while (LIKELY(s_.Peek() >= '0' && s_.Peek() <= '9')) s_.Take(); + } + return true; +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipObject() { + DCHECK(s_.Peek() == '{'); + s_.Take(); + SkipWhitespace(); + if (Consume('}')) return true; + while (true) { + ERROR_IF_FALSE(s_.Peek() == '"', kParseErrorObjectMissName); + RETURN_IF_FALSE(SkipString()); + SkipWhitespace(); + ERROR_IF_FALSE(Consume(':'), kParseErrorObjectMissColon); + SkipWhitespace(); + RETURN_IF_FALSE(SkipValue()); + SkipWhitespace(); + if (Consume(',')) SkipWhitespace(); + else if (Consume('}')) return true; + else ERROR_IF_FALSE(false, kParseErrorObjectMissCommaOrCurlyBracket); + } +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipArray() { + DCHECK(s_.Peek() == '['); + s_.Take(); + SkipWhitespace(); + if (Consume(']')) return true; + while (true) { + RETURN_IF_FALSE(SkipValue()); + SkipWhitespace(); + if (Consume(',')) SkipWhitespace(); + else if (Consume(']')) return true; + else ERROR_IF_FALSE(false, kParseErrorArrayMissCommaOrSquareBracket); + } +} + +template<class Stream> +bool JsonSkipper<Stream>::SkipValue() { + // Please note that in standard JSON, the special values null, true, and false must all + // be in lowercase form. Any other cases will be considered invalid values, which is + // consistent with the behavior of rapidjson. + switch (s_.Peek()) { + case 'n': RETURN_IF_FALSE(SkipNull()); break; + case 't': RETURN_IF_FALSE(SkipTrue()); break; + case 'f': RETURN_IF_FALSE(SkipFalse()); break; + case '"': RETURN_IF_FALSE(SkipString()); break; + case '{': RETURN_IF_FALSE(SkipObject()); break; + case '[': RETURN_IF_FALSE(SkipArray()); break; + default: RETURN_IF_FALSE(SkipNumber()); break; + } + return true; +} + template class impala::JsonParser<SimpleJsonScanner>; template class impala::JsonParser<HdfsJsonScanner>; +template class impala::JsonSkipper<SimpleStream>; diff --git a/be/src/exec/json/json-parser.h b/be/src/exec/json/json-parser.h index a6db77578..1fb0568b7 100644 --- a/be/src/exec/json/json-parser.h +++ b/be/src/exec/json/json-parser.h @@ -143,9 +143,18 @@ public: /// invalid JSON format, etc.), and Scanner returns an error status after handling /// the error. /// 4. Scanner's BreakParse() indicates the need to end parsing. + /// Please note that 'max_rows' and 'num_rows' actually represent the number of + /// top-level JSON values processed by the parser, meaning that if there are top-level + /// arrays, strings, or other JSON values in the JSON data, they will also be included + /// in the count. Status Parse(int max_rows, int* num_rows); - CharStream& stream() { return stream_; } + /// Parse the JSON data and directly count how many top-level JSON objects (excluding + /// nested ones) there are without performing specific data copying and conversion. It + /// behaves similarly to Parse() but is faster, suitable for zero slots scans such as + /// count(*). Different from Parse(), here 'max_rows' and 'num_rows' only count the + /// top-level JSON objects and do not include other top-level JSON values. + Status CountJsonObjects(int max_rows, int* num_rows); private: friend class rapidjson::GenericReader<rapidjson::UTF8<>, rapidjson::UTF8<>>; @@ -231,13 +240,85 @@ private: std::vector<char> field_found_; }; +/// A util class used to assist in parsing JSON. When conducting zero slots scans, no +/// actual data from the JSON is needed, only the number of JSON objects. This class is +/// essentially a simplified version of a rapidjson parser (rapidjson::GenericReader), +/// removing specific data parsing and copying operations, allowing for faster parsing of +/// the number of JSON objects. +/// The class retains the ability to recognize malformed JSON and provide specific error +/// codes like rapidjson's parser. However, as it skips specific data parsing, it cannot +/// identify string encoding errors or numeric overflow errors. Nonetheless, these data +/// errors do not affect the counting of JSON objects, and ignoring them is acceptable. +/// Please refer to the following link for code about rapidjson::GenericReader: +/// https://github.com/Tencent/rapidjson/blob/5ec44fb/include/rapidjson/reader.h#L539 +template<class Stream> +class JsonSkipper { + public: + JsonSkipper(Stream& stream) : s_(stream) { } + + /// Consume the stream until skipping a complete outermost JSON object, return false and + /// log the corresponding error code if an error occurs. + bool SkipNextObject(); + + bool HasError() { return code_ != rapidjson::kParseErrorNone; } + rapidjson::ParseErrorCode GetErrorCode() { return code_; } + + private: + friend class JsonParserTest; + + /// This function attempts to consume a character from the stream, if the next character + /// matches the 'expect', take out it and return true, otherwise return false. + inline bool Consume(char expect) { + if (LIKELY(s_.Peek() == expect)) { + s_.Take(); + return true; + } + return false; + } + + inline void SkipWhitespace() { + char c; + while ((c = s_.Peek()) == ' ' || c == '\n' || c == '\r' || c == '\t') s_.Take(); + } + + /// The following function is used to skip a specific JSON value. It maintains logic + /// consistent with rapidjson, consuming the stream and returning true upon successfully + /// skipping the specified value, or returning false and setting the respective error + /// code if an error is encountered. + /// See more details about valid JSON values in: https://rapidjson.org/md_doc_sax.html + bool SkipNull(); + bool SkipTrue(); + bool SkipFalse(); + bool SkipString(); + bool SkipNumber(); + bool SkipObject(); + bool SkipArray(); + bool SkipValue(); + + Stream& s_; + rapidjson::ParseErrorCode code_ = rapidjson::kParseErrorNone; +}; + +/// A simple c_str wrapper for testing JsonSkipper. +class SimpleStream { +public: + SimpleStream(const char* str) : current_(str) { } + + char Peek() { return *current_; } + + char Take() { return *current_ == '\0' ? '\0' : *current_++; } + +private: + const char* current_ = nullptr; +}; + /// A simple class for testing JsonParser. class SimpleJsonScanner { public: using GetBufferFunc = std::function<void(const char**, const char**)>; SimpleJsonScanner(const std::vector<std::string>& schema, GetBufferFunc get_buffer) - : parser_(schema, this), get_buffer_(get_buffer) { + : row_count_(0), parser_(schema, this), get_buffer_(get_buffer) { parser_.ResetParser(); current_row_.resize(schema.size()); } @@ -246,10 +327,18 @@ public: *num_rows = 0; if (!parser_.IsTidy()) return Status("Parser is not tidy"); RETURN_IF_ERROR(parser_.Parse(max_row, num_rows)); - return Status::OK();; + return Status::OK(); } - std::string Result() { return result_.str(); } + Status Count(int max_row, int* num_rows) { + *num_rows = 0; + RETURN_IF_ERROR(parser_.CountJsonObjects(max_row, num_rows)); + return Status::OK(); + } + + std::string result() const { return result_.str(); } + + size_t row_count() const { return row_count_; } private: friend class JsonParser<SimpleJsonScanner>; @@ -271,6 +360,7 @@ private: void SubmitRow() { for (const auto& s : current_row_) result_ << s << ", "; result_ << '\n'; + ++row_count_; } void AddNull(int index) { @@ -294,6 +384,7 @@ private: std::vector<std::string> current_row_; std::stringstream result_; + size_t row_count_; JsonParser<SimpleJsonScanner> parser_; GetBufferFunc get_buffer_; }; diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 47c49bdd0..7160f3a89 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1259,6 +1259,10 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va query_options->__set_enable_tuple_cache(enable_tuple_cache); break; } + case TImpalaQueryOptions::DISABLE_OPTIMIZED_JSON_COUNT_STAR: { + query_options->__set_disable_optimized_json_count_star(IsTrue(value)); + break; + } case TImpalaQueryOptions::ICEBERG_DISABLE_COUNT_STAR_OPTIMIZATION: { query_options->__set_iceberg_disable_count_star_optimization(IsTrue(value)); break; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index ac17ac035..25b645f28 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -52,7 +52,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ - TImpalaQueryOptions::WRITE_KUDU_UTC_TIMESTAMPS + 1); \ + TImpalaQueryOptions::DISABLE_OPTIMIZED_JSON_COUNT_STAR + 1); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ @@ -335,6 +335,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> QUERY_OPT_FN(use_null_slots_cache, USE_NULL_SLOTS_CACHE, TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(write_kudu_utc_timestamps, \ WRITE_KUDU_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED) \ + QUERY_OPT_FN(disable_optimized_json_count_star, DISABLE_OPTIMIZED_JSON_COUNT_STAR, \ + TQueryOptionLevel::ADVANCED) \ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 7ab658f87..b25223789 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -953,6 +953,9 @@ enum TImpalaQueryOptions { // written to Kudu as UNIXTIME_MICRO. // Reads are unaffected (see CONVERT_KUDU_UTC_TIMESTAMPS). WRITE_KUDU_UTC_TIMESTAMPS = 180 + + // Turns off optimized JSON count star (zero slots) scan, falls back to rapidjson parse. + DISABLE_OPTIMIZED_JSON_COUNT_STAR = 181 } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 7bba052ee..02a3e6f1b 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -728,6 +728,7 @@ struct TQueryOptions { 178: optional TSlotCountStrategy slot_count_strategy = TSlotCountStrategy.LARGEST_FRAGMENT + // See comment in ImpalaService.thrift 179: optional bool clean_dbcp_ds_cache = true; // See comment in ImpalaService.thrift @@ -735,6 +736,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 181: optional bool write_kudu_utc_timestamps = false; + + // See comment in ImpalaService.thrift + 182: optional bool disable_optimized_json_count_star = false; } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/testdata/workloads/functional-query/queries/QueryTest/complex_json.test b/testdata/workloads/functional-query/queries/QueryTest/complex_json.test index 1a1ddbc3d..3c049dd27 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/complex_json.test +++ b/testdata/workloads/functional-query/queries/QueryTest/complex_json.test @@ -11,4 +11,11 @@ int, string, string, string 5,'Emily','NULL','NULL' 13,'Liam','NULL','NULL' 15,'Nora','NULL','NULL' +==== +---- QUERY +select count(*) from complex_json +---- TYPES +bigint +---- RESULTS +5 ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/malformed_json.test b/testdata/workloads/functional-query/queries/QueryTest/malformed_json.test index 155d6f0ac..ea4e8de95 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/malformed_json.test +++ b/testdata/workloads/functional-query/queries/QueryTest/malformed_json.test @@ -22,4 +22,11 @@ false,9,0.899999976158,'abc123' true,10,1.0,'abc123' NULL,NULL,NULL,'NULL' NULL,NULL,NULL,'abc123' +==== +---- QUERY +select count(*) from malformed_json +---- TYPES +bigint +---- RESULTS +13 ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/multiline_json.test b/testdata/workloads/functional-query/queries/QueryTest/multiline_json.test index 4368dd8db..f3b5361aa 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/multiline_json.test +++ b/testdata/workloads/functional-query/queries/QueryTest/multiline_json.test @@ -24,4 +24,11 @@ int, string, string 7,'multiline object4','abcdefg' 8,'one line multiple objects','obj1' 9,'one line multiple objects','obj2' +==== +---- QUERY +select count(*) from multiline_json +---- TYPES +bigint +---- RESULTS +9 ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/overflow_json.test b/testdata/workloads/functional-query/queries/QueryTest/overflow_json.test index d4afe584f..92986adaf 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/overflow_json.test +++ b/testdata/workloads/functional-query/queries/QueryTest/overflow_json.test @@ -18,3 +18,10 @@ tinyint, smallint, int, bigint, float, double 127,32767,2147483647,9223372036854775807,Infinity,Infinity -128,-32768,-2147483648,-9223372036854775808,-Infinity,-Infinity ==== +---- QUERY +select count(*) from overflow_json +---- TYPES +bigint +---- RESULTS +6 +==== \ No newline at end of file diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py index e67dc43dc..8c33dbe18 100644 --- a/tests/query_test/test_queries.py +++ b/tests/query_test/test_queries.py @@ -28,7 +28,8 @@ from tests.common.skip import ( from tests.common.test_dimensions import ( create_uncompressed_text_dimension, create_uncompressed_json_dimension, create_exec_option_dimension_from_dict, create_client_protocol_dimension, - hs2_parquet_constraint, extend_exec_option_dimension, FILE_FORMAT_TO_STORED_AS_MAP) + hs2_parquet_constraint, extend_exec_option_dimension, FILE_FORMAT_TO_STORED_AS_MAP, + add_exec_option_dimension) from tests.util.filesystem_utils import get_fs_path from subprocess import check_call @@ -262,6 +263,7 @@ class TestQueriesJsonTables(ImpalaTestSuite): super(TestQueriesJsonTables, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension( create_uncompressed_json_dimension(cls.get_workload())) + add_exec_option_dimension(cls, 'disable_optimized_json_count_star', [0, 1]) @classmethod def get_workload(cls):
