This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c4f08f805 [fix](hudi) the required fields are empty if only reading partition columns (#22187) 4c4f08f805 is described below commit 4c4f08f8059b3c854669322f8777f294f5a89e93 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Wed Jul 26 10:59:45 2023 +0800 [fix](hudi) the required fields are empty if only reading partition columns (#22187) 1. If only read the partition columns, the `JniConnector` will produce empty required fields, so `HudiJniScanner` should read the "_hoodie_record_key" field at least to know how many rows in current hoodie split. Even if the `JniConnector` doesn't read this field, the call of `releaseTable` in `JniConnector` will reclaim the resource. 2. To prevent BE failure and exit, `JniConnector` should call release methods after `HudiJniScanner` is initialized. It should be noted that `VectorTable` is created lazily in `JniScanner`, so we don't need to reclaim the resource when `HudiJniScanner` is failed to initialize. ## Remaining works Other jni readers like `paimon` and `maxcompute` may encounter the same problems, the jni reader need to handle this abnormal situation on its own, and currently this fix can only ensure that BE will not exit. --- be/src/vec/exec/jni_connector.cpp | 61 ++++++++++++---------- be/src/vec/exec/jni_connector.h | 1 + .../java/org/apache/doris/hudi/HudiJniScanner.java | 54 ++++++++++--------- .../org/apache/doris/hudi/BaseSplitReader.scala | 12 ++++- 4 files changed, 74 insertions(+), 54 deletions(-) diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index edb195479a..ea7e9e82c4 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -156,37 +156,39 @@ Status JniConnector::close() { if (!_closed) { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - // update scanner metrics - for (const auto& metric : get_statistics(env)) { - std::vector<std::string> type_and_name = split(metric.first, ":"); - if (type_and_name.size() != 2) { - LOG(WARNING) << "Name of JNI Scanner metric should be pattern like " - << "'metricType:metricName'"; - continue; + if (_scanner_initialized) { + // update scanner metrics + for (const auto& metric : get_statistics(env)) { + std::vector<std::string> type_and_name = split(metric.first, ":"); + if (type_and_name.size() != 2) { + LOG(WARNING) << "Name of JNI Scanner metric should be pattern like " + << "'metricType:metricName'"; + continue; + } + long metric_value = std::stol(metric.second); + RuntimeProfile::Counter* scanner_counter; + if (type_and_name[0] == "timer") { + scanner_counter = + ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str()); + } else if (type_and_name[0] == "counter") { + scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT, + _connector_name.c_str()); + } else if (type_and_name[0] == "bytes") { + scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES, + _connector_name.c_str()); + } else { + LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes"; + continue; + } + COUNTER_UPDATE(scanner_counter, metric_value); } - long metric_value = std::stol(metric.second); - RuntimeProfile::Counter* scanner_counter; - if (type_and_name[0] == "timer") { - scanner_counter = - ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str()); - } else if (type_and_name[0] == "counter") { - scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT, - _connector_name.c_str()); - } else if (type_and_name[0] == "bytes") { - scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES, - _connector_name.c_str()); - } else { - LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes"; - continue; - } - COUNTER_UPDATE(scanner_counter, metric_value); - } - // _fill_block may be failed and returned, we should release table in close. - // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent - env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); - env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close); - env->DeleteGlobalRef(_jni_scanner_obj); + // _fill_block may be failed and returned, we should release table in close. + // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close); + env->DeleteGlobalRef(_jni_scanner_obj); + } env->DeleteGlobalRef(_jni_scanner_cls); _closed = true; jthrowable exc = (env)->ExceptionOccurred(); @@ -222,6 +224,7 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) { _jni_scanner_get_statistics = env->GetMethodID(_jni_scanner_cls, "getStatistics", "()Ljava/util/Map;"); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_scanner_obj, &_jni_scanner_obj)); + _scanner_initialized = true; env->DeleteLocalRef(jni_scanner_obj); RETURN_ERROR_IF_EXC(env); return Status::OK(); diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index 0f08fbe0f8..1cadc37a1b 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -257,6 +257,7 @@ private: size_t _has_read = 0; bool _closed = false; + bool _scanner_initialized = false; jclass _jni_scanner_cls; jobject _jni_scanner_obj; jmethodID _jni_scanner_open; diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index d067493a79..539ab8f7a8 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -47,6 +47,7 @@ public class HudiJniScanner extends JniScanner { private static final Logger LOG = Logger.getLogger(HudiJniScanner.class); private final int fetchSize; + private final String debugString; private final HoodieSplit split; private final ScanPredicate[] predicates; private final ClassLoader classLoader; @@ -56,26 +57,29 @@ public class HudiJniScanner extends JniScanner { private Iterator<InternalRow> recordIterator; public HudiJniScanner(int fetchSize, Map<String, String> params) { - if (LOG.isDebugEnabled()) { - LOG.debug("Hudi JNI params:\n" + params.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue()) - .collect(Collectors.joining("\n"))); - } - this.classLoader = this.getClass().getClassLoader(); - String predicatesAddressString = params.remove("push_down_predicates"); - this.fetchSize = fetchSize; - this.split = new HoodieSplit(params); - if (predicatesAddressString == null) { - predicates = new ScanPredicate[0]; - } else { - long predicatesAddress = Long.parseLong(predicatesAddressString); - if (predicatesAddress != 0) { - predicates = ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes()); - LOG.info("HudiJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); - } else { + debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue()) + .collect(Collectors.joining("\n")); + try { + this.classLoader = this.getClass().getClassLoader(); + String predicatesAddressString = params.remove("push_down_predicates"); + this.fetchSize = fetchSize; + this.split = new HoodieSplit(params); + if (predicatesAddressString == null) { predicates = new ScanPredicate[0]; + } else { + long predicatesAddress = Long.parseLong(predicatesAddressString); + if (predicatesAddress != 0) { + predicates = ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes()); + LOG.info("HudiJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); + } else { + predicates = new ScanPredicate[0]; + } } + ugi = Utils.getUserGroupInformation(split.hadoopConf()); + } catch (Exception e) { + LOG.error("Failed to initialize hudi scanner, split params:\n" + debugString, e); + throw e; } - ugi = Utils.getUserGroupInformation(split.hadoopConf()); } @Override @@ -104,17 +108,18 @@ public class HudiJniScanner extends JniScanner { } } }, 100, 1000, TimeUnit.MILLISECONDS); - if (ugi != null) { - try { + try { + if (ugi != null) { recordIterator = ugi.doAs( (PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader( split).buildScanIterator(split.requiredFields(), new Filter[0])); - } catch (InterruptedException e) { - throw new IOException(e); + } else { + recordIterator = new MORSnapshotSplitReader(split) + .buildScanIterator(split.requiredFields(), new Filter[0]); } - } else { - recordIterator = new MORSnapshotSplitReader(split) - .buildScanIterator(split.requiredFields(), new Filter[0]); + } catch (Exception e) { + LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e); + throw new IOException(e.getMessage(), e); } isKilled.set(true); executorService.shutdownNow(); @@ -146,6 +151,7 @@ public class HudiJniScanner extends JniScanner { return readRowNumbers; } catch (Exception e) { close(); + LOG.error("Failed to get the next batch of hudi, split params:\n" + debugString, e); throw new IOException("Failed to get the next batch of hudi.", e); } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index 5eceb1c9b3..cdae395534 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -84,7 +84,17 @@ class HoodieSplit(private val params: jutil.Map[String, String]) { val hudiColumnTypes: Map[String, String] = hudiColumnNames.zip( params.remove("hudi_column_types").split("#")).toMap - val requiredFields: Array[String] = params.remove("required_fields").split(",") + val requiredFields: Array[String] = { + val readFields = params.remove("required_fields").split(",").filter(_.nonEmpty) + if (readFields.isEmpty) { + // If only read the partition columns, the JniConnector will produce empty required fields. + // Read the "_hoodie_record_key" field at least to know how many rows in current hoodie split + // Even if the JniConnector doesn't read this field, the call of releaseTable will reclaim the resource + Array(HoodieRecord.RECORD_KEY_METADATA_FIELD) + } else { + readFields + } + } val requiredTypes: Array[ColumnType] = requiredFields.map( field => ColumnType.parseType(field, hudiColumnTypes(field))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org