This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c4f08f805 [fix](hudi) the required fields are empty if only reading 
partition columns (#22187)
4c4f08f805 is described below

commit 4c4f08f8059b3c854669322f8777f294f5a89e93
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Wed Jul 26 10:59:45 2023 +0800

    [fix](hudi) the required fields are empty if only reading partition columns 
(#22187)
    
    1. If only read the partition columns, the `JniConnector` will produce 
empty required fields, so `HudiJniScanner` should read the "_hoodie_record_key" 
field at least to know how many rows in current hoodie split. Even if the 
`JniConnector` doesn't read this field, the call of `releaseTable` in 
`JniConnector` will reclaim the resource.
    
    2. To prevent BE failure and exit, `JniConnector` should call release 
methods after `HudiJniScanner` is initialized. It should be noted that 
`VectorTable` is created lazily in `JniScanner`,  so we don't need to reclaim 
the resource when `HudiJniScanner` is failed to initialize.
    
    ## Remaining works
    Other jni readers like `paimon` and `maxcompute` may encounter the same 
problems, the jni reader need to handle this abnormal situation on its own, and 
currently this fix can only ensure that BE will not exit.
---
 be/src/vec/exec/jni_connector.cpp                  | 61 ++++++++++++----------
 be/src/vec/exec/jni_connector.h                    |  1 +
 .../java/org/apache/doris/hudi/HudiJniScanner.java | 54 ++++++++++---------
 .../org/apache/doris/hudi/BaseSplitReader.scala    | 12 ++++-
 4 files changed, 74 insertions(+), 54 deletions(-)

diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index edb195479a..ea7e9e82c4 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -156,37 +156,39 @@ Status JniConnector::close() {
     if (!_closed) {
         JNIEnv* env = nullptr;
         RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
-        // update scanner metrics
-        for (const auto& metric : get_statistics(env)) {
-            std::vector<std::string> type_and_name = split(metric.first, ":");
-            if (type_and_name.size() != 2) {
-                LOG(WARNING) << "Name of JNI Scanner metric should be pattern 
like "
-                             << "'metricType:metricName'";
-                continue;
+        if (_scanner_initialized) {
+            // update scanner metrics
+            for (const auto& metric : get_statistics(env)) {
+                std::vector<std::string> type_and_name = split(metric.first, 
":");
+                if (type_and_name.size() != 2) {
+                    LOG(WARNING) << "Name of JNI Scanner metric should be 
pattern like "
+                                 << "'metricType:metricName'";
+                    continue;
+                }
+                long metric_value = std::stol(metric.second);
+                RuntimeProfile::Counter* scanner_counter;
+                if (type_and_name[0] == "timer") {
+                    scanner_counter =
+                            ADD_CHILD_TIMER(_profile, type_and_name[1], 
_connector_name.c_str());
+                } else if (type_and_name[0] == "counter") {
+                    scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::UNIT,
+                                                        
_connector_name.c_str());
+                } else if (type_and_name[0] == "bytes") {
+                    scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::BYTES,
+                                                        
_connector_name.c_str());
+                } else {
+                    LOG(WARNING) << "Type of JNI Scanner metric should be 
timer, counter or bytes";
+                    continue;
+                }
+                COUNTER_UPDATE(scanner_counter, metric_value);
             }
-            long metric_value = std::stol(metric.second);
-            RuntimeProfile::Counter* scanner_counter;
-            if (type_and_name[0] == "timer") {
-                scanner_counter =
-                        ADD_CHILD_TIMER(_profile, type_and_name[1], 
_connector_name.c_str());
-            } else if (type_and_name[0] == "counter") {
-                scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::UNIT,
-                                                    _connector_name.c_str());
-            } else if (type_and_name[0] == "bytes") {
-                scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::BYTES,
-                                                    _connector_name.c_str());
-            } else {
-                LOG(WARNING) << "Type of JNI Scanner metric should be timer, 
counter or bytes";
-                continue;
-            }
-            COUNTER_UPDATE(scanner_counter, metric_value);
-        }
 
-        // _fill_block may be failed and returned, we should release table in 
close.
-        // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent
-        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
-        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
-        env->DeleteGlobalRef(_jni_scanner_obj);
+            // _fill_block may be failed and returned, we should release table 
in close.
+            // org.apache.doris.common.jni.JniScanner#releaseTable is 
idempotent
+            env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
+            env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
+            env->DeleteGlobalRef(_jni_scanner_obj);
+        }
         env->DeleteGlobalRef(_jni_scanner_cls);
         _closed = true;
         jthrowable exc = (env)->ExceptionOccurred();
@@ -222,6 +224,7 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int 
batch_size) {
     _jni_scanner_get_statistics =
             env->GetMethodID(_jni_scanner_cls, "getStatistics", 
"()Ljava/util/Map;");
     RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_scanner_obj, 
&_jni_scanner_obj));
+    _scanner_initialized = true;
     env->DeleteLocalRef(jni_scanner_obj);
     RETURN_ERROR_IF_EXC(env);
     return Status::OK();
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 0f08fbe0f8..1cadc37a1b 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -257,6 +257,7 @@ private:
     size_t _has_read = 0;
 
     bool _closed = false;
+    bool _scanner_initialized = false;
     jclass _jni_scanner_cls;
     jobject _jni_scanner_obj;
     jmethodID _jni_scanner_open;
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index d067493a79..539ab8f7a8 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -47,6 +47,7 @@ public class HudiJniScanner extends JniScanner {
     private static final Logger LOG = Logger.getLogger(HudiJniScanner.class);
 
     private final int fetchSize;
+    private final String debugString;
     private final HoodieSplit split;
     private final ScanPredicate[] predicates;
     private final ClassLoader classLoader;
@@ -56,26 +57,29 @@ public class HudiJniScanner extends JniScanner {
     private Iterator<InternalRow> recordIterator;
 
     public HudiJniScanner(int fetchSize, Map<String, String> params) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Hudi JNI params:\n" + params.entrySet().stream().map(kv 
-> kv.getKey() + "=" + kv.getValue())
-                    .collect(Collectors.joining("\n")));
-        }
-        this.classLoader = this.getClass().getClassLoader();
-        String predicatesAddressString = params.remove("push_down_predicates");
-        this.fetchSize = fetchSize;
-        this.split = new HoodieSplit(params);
-        if (predicatesAddressString == null) {
-            predicates = new ScanPredicate[0];
-        } else {
-            long predicatesAddress = Long.parseLong(predicatesAddressString);
-            if (predicatesAddress != 0) {
-                predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
-                LOG.info("HudiJniScanner gets pushed-down predicates:  " + 
ScanPredicate.dump(predicates));
-            } else {
+        debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" + 
kv.getValue())
+                .collect(Collectors.joining("\n"));
+        try {
+            this.classLoader = this.getClass().getClassLoader();
+            String predicatesAddressString = 
params.remove("push_down_predicates");
+            this.fetchSize = fetchSize;
+            this.split = new HoodieSplit(params);
+            if (predicatesAddressString == null) {
                 predicates = new ScanPredicate[0];
+            } else {
+                long predicatesAddress = 
Long.parseLong(predicatesAddressString);
+                if (predicatesAddress != 0) {
+                    predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
+                    LOG.info("HudiJniScanner gets pushed-down predicates:  " + 
ScanPredicate.dump(predicates));
+                } else {
+                    predicates = new ScanPredicate[0];
+                }
             }
+            ugi = Utils.getUserGroupInformation(split.hadoopConf());
+        } catch (Exception e) {
+            LOG.error("Failed to initialize hudi scanner, split params:\n" + 
debugString, e);
+            throw e;
         }
-        ugi = Utils.getUserGroupInformation(split.hadoopConf());
     }
 
     @Override
@@ -104,17 +108,18 @@ public class HudiJniScanner extends JniScanner {
                 }
             }
         }, 100, 1000, TimeUnit.MILLISECONDS);
-        if (ugi != null) {
-            try {
+        try {
+            if (ugi != null) {
                 recordIterator = ugi.doAs(
                         (PrivilegedExceptionAction<Iterator<InternalRow>>) () 
-> new MORSnapshotSplitReader(
                                 
split).buildScanIterator(split.requiredFields(), new Filter[0]));
-            } catch (InterruptedException e) {
-                throw new IOException(e);
+            } else {
+                recordIterator = new MORSnapshotSplitReader(split)
+                        .buildScanIterator(split.requiredFields(), new 
Filter[0]);
             }
-        } else {
-            recordIterator = new MORSnapshotSplitReader(split)
-                    .buildScanIterator(split.requiredFields(), new Filter[0]);
+        } catch (Exception e) {
+            LOG.error("Failed to open hudi scanner, split params:\n" + 
debugString, e);
+            throw new IOException(e.getMessage(), e);
         }
         isKilled.set(true);
         executorService.shutdownNow();
@@ -146,6 +151,7 @@ public class HudiJniScanner extends JniScanner {
             return readRowNumbers;
         } catch (Exception e) {
             close();
+            LOG.error("Failed to get the next batch of hudi, split params:\n" 
+ debugString, e);
             throw new IOException("Failed to get the next batch of hudi.", e);
         }
     }
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index 5eceb1c9b3..cdae395534 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -84,7 +84,17 @@ class HoodieSplit(private val params: jutil.Map[String, 
String]) {
   val hudiColumnTypes: Map[String, String] = hudiColumnNames.zip(
     params.remove("hudi_column_types").split("#")).toMap
 
-  val requiredFields: Array[String] = 
params.remove("required_fields").split(",")
+  val requiredFields: Array[String] = {
+    val readFields = 
params.remove("required_fields").split(",").filter(_.nonEmpty)
+    if (readFields.isEmpty) {
+      // If only read the partition columns, the JniConnector will produce 
empty required fields.
+      // Read the "_hoodie_record_key" field at least to know how many rows in 
current hoodie split
+      // Even if the JniConnector doesn't read this field, the call of 
releaseTable will reclaim the resource
+      Array(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+    } else {
+      readFields
+    }
+  }
   val requiredTypes: Array[ColumnType] = requiredFields.map(
     field => ColumnType.parseType(field, hudiColumnTypes(field)))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to