kaka11chen commented on code in PR #23745: URL: https://github.com/apache/doris/pull/23745#discussion_r1312638010
########## fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java: ########## @@ -84,53 +144,70 @@ public HudiJniScanner(int fetchSize, Map<String, String> params) { @Override public void open() throws IOException { - Thread.currentThread().setContextClassLoader(classLoader); - initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize); - long startTime = System.nanoTime(); - // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, - // so use another process to kill this stuck process. - // TODO(gaoxin): better way to solve the stuck process? - AtomicBoolean isKilled = new AtomicBoolean(false); - ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); - executorService.scheduleAtFixedRate(() -> { - if (!isKilled.get()) { - synchronized (HudiJniScanner.class) { - List<Long> pids = Utils.getChildProcessIds( - Utils.getCurrentProcId()); - for (long pid : pids) { - String cmd = Utils.getCommandLine(pid); - if (cmd != null && cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) { - Utils.killProcess(pid); - isKilled.set(true); - LOG.info("Kill hotspot debugger process " + pid); + Future<?> avroFuture = avroReadPool.submit(() -> { + Thread.currentThread().setContextClassLoader(classLoader); + initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize); + long startTime = System.nanoTime(); + // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, + // so use another process to kill this stuck process. + // TODO(gaoxin): better way to solve the stuck process? + AtomicBoolean isKilled = new AtomicBoolean(false); + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(() -> { + if (!isKilled.get()) { + synchronized (HudiJniScanner.class) { + List<Long> pids = Utils.getChildProcessIds( + Utils.getCurrentProcId()); + for (long pid : pids) { + String cmd = Utils.getCommandLine(pid); + if (cmd != null && cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) { + Utils.killProcess(pid); + isKilled.set(true); + LOG.info("Kill hotspot debugger process " + pid); + } } } } + }, 100, 1000, TimeUnit.MILLISECONDS); + + cleanResolverLock.readLock().lock(); + try { + lastUpdateTime.set(System.currentTimeMillis()); + if (ugi != null) { + recordIterator = ugi.doAs( + (PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader( + split).buildScanIterator(split.requiredFields(), new Filter[0])); + } else { + recordIterator = new MORSnapshotSplitReader(split) + .buildScanIterator(split.requiredFields(), new Filter[0]); + } + } catch (Exception e) { + LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e); + throw new RuntimeException(e.getMessage(), e); + } finally { + cleanResolverLock.readLock().unlock(); } - }, 100, 1000, TimeUnit.MILLISECONDS); - try { - if (ugi != null) { - recordIterator = ugi.doAs( - (PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader( - split).buildScanIterator(split.requiredFields(), new Filter[0])); - } else { - recordIterator = new MORSnapshotSplitReader(split) - .buildScanIterator(split.requiredFields(), new Filter[0]); + isKilled.set(true); + executorService.shutdownNow(); Review Comment: use style? ``` shutdown() waitTermination(timeout) if timed out, shutdownNow() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org