kaka11chen commented on code in PR #23745:
URL: https://github.com/apache/doris/pull/23745#discussion_r1312638010


##########
fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java:
##########
@@ -84,53 +144,70 @@ public HudiJniScanner(int fetchSize, Map<String, String> 
params) {
 
     @Override
     public void open() throws IOException {
-        Thread.currentThread().setContextClassLoader(classLoader);
-        initTableInfo(split.requiredTypes(), split.requiredFields(), 
predicates, fetchSize);
-        long startTime = System.nanoTime();
-        // RecordReader will use ProcessBuilder to start a hotspot process, 
which may be stuck,
-        // so use another process to kill this stuck process.
-        // TODO(gaoxin): better way to solve the stuck process?
-        AtomicBoolean isKilled = new AtomicBoolean(false);
-        ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
-        executorService.scheduleAtFixedRate(() -> {
-            if (!isKilled.get()) {
-                synchronized (HudiJniScanner.class) {
-                    List<Long> pids = Utils.getChildProcessIds(
-                            Utils.getCurrentProcId());
-                    for (long pid : pids) {
-                        String cmd = Utils.getCommandLine(pid);
-                        if (cmd != null && 
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
-                            Utils.killProcess(pid);
-                            isKilled.set(true);
-                            LOG.info("Kill hotspot debugger process " + pid);
+        Future<?> avroFuture = avroReadPool.submit(() -> {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            initTableInfo(split.requiredTypes(), split.requiredFields(), 
predicates, fetchSize);
+            long startTime = System.nanoTime();
+            // RecordReader will use ProcessBuilder to start a hotspot 
process, which may be stuck,
+            // so use another process to kill this stuck process.
+            // TODO(gaoxin): better way to solve the stuck process?
+            AtomicBoolean isKilled = new AtomicBoolean(false);
+            ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
+            executorService.scheduleAtFixedRate(() -> {
+                if (!isKilled.get()) {
+                    synchronized (HudiJniScanner.class) {
+                        List<Long> pids = Utils.getChildProcessIds(
+                                Utils.getCurrentProcId());
+                        for (long pid : pids) {
+                            String cmd = Utils.getCommandLine(pid);
+                            if (cmd != null && 
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
+                                Utils.killProcess(pid);
+                                isKilled.set(true);
+                                LOG.info("Kill hotspot debugger process " + 
pid);
+                            }
                         }
                     }
                 }
+            }, 100, 1000, TimeUnit.MILLISECONDS);
+
+            cleanResolverLock.readLock().lock();
+            try {
+                lastUpdateTime.set(System.currentTimeMillis());
+                if (ugi != null) {
+                    recordIterator = ugi.doAs(
+                            (PrivilegedExceptionAction<Iterator<InternalRow>>) 
() -> new MORSnapshotSplitReader(
+                                    
split).buildScanIterator(split.requiredFields(), new Filter[0]));
+                } else {
+                    recordIterator = new MORSnapshotSplitReader(split)
+                            .buildScanIterator(split.requiredFields(), new 
Filter[0]);
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to open hudi scanner, split params:\n" + 
debugString, e);
+                throw new RuntimeException(e.getMessage(), e);
+            } finally {
+                cleanResolverLock.readLock().unlock();
             }
-        }, 100, 1000, TimeUnit.MILLISECONDS);
-        try {
-            if (ugi != null) {
-                recordIterator = ugi.doAs(
-                        (PrivilegedExceptionAction<Iterator<InternalRow>>) () 
-> new MORSnapshotSplitReader(
-                                
split).buildScanIterator(split.requiredFields(), new Filter[0]));
-            } else {
-                recordIterator = new MORSnapshotSplitReader(split)
-                        .buildScanIterator(split.requiredFields(), new 
Filter[0]);
+            isKilled.set(true);
+            executorService.shutdownNow();

Review Comment:
   use style?
   ```
   shutdown()
   waitTermination(timeout)
   if timed out, shutdownNow()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to