[ https://issues.apache.org/jira/browse/HIVE-24380?focusedWorklogId=512794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-512794 ]
ASF GitHub Bot logged work on HIVE-24380: ----------------------------------------- Author: ASF GitHub Bot Created on: 17/Nov/20 09:26 Start Date: 17/Nov/20 09:26 Worklog Time Spent: 10m Work Description: rbalamohan commented on a change in pull request #1670: URL: https://github.com/apache/hive/pull/1670#discussion_r525001043 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java ########## @@ -159,21 +165,46 @@ private void processTableScans(MapWork work, Set<TableScanOperator> tableScans) } else { managedEmptyPathMap.put(path, false); } + } else { + numberOfUnmanagedPaths++; } } } + int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, (int)Math.sqrt(numberOfUnmanagedPaths)); Review comment: Would be good to reuse HIVE_COMPUTE_SPLITS_NUM_THREADS instead of MAXIMUM_ASYNC_STATUS_FETCHER_COUNT. On need basis, it can be set to "1" to get back old behaviour. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java ########## @@ -159,21 +165,46 @@ private void processTableScans(MapWork work, Set<TableScanOperator> tableScans) } else { managedEmptyPathMap.put(path, false); } + } else { + numberOfUnmanagedPaths++; } } } + int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, (int)Math.sqrt(numberOfUnmanagedPaths)); + ForkJoinPool pool = null; Review comment: Can use regular ExecutorService since it is not doing something very similar to ForkJoin. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java ########## @@ -159,21 +165,46 @@ private void processTableScans(MapWork work, Set<TableScanOperator> tableScans) } else { managedEmptyPathMap.put(path, false); } + } else { + numberOfUnmanagedPaths++; } } } + int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, (int)Math.sqrt(numberOfUnmanagedPaths)); + ForkJoinPool pool = null; + if (fetcherPoolParallelism > 1) { // dont create thread pool for a few partitions + pool = new ForkJoinPool(fetcherPoolParallelism); + } + List<ForkJoinTask<Void>> tasks = new ArrayList<>(numberOfUnmanagedPaths); for (Entry<Path, Collection<String>> entry : candidatePathsToAliases.entrySet()) { Path path = entry.getKey(); Collection<String> allowed = entry.getValue(); Boolean isEmpty = managedEmptyPathMap.get(path); // if isEmpty is null, either stats are not up to date or this is external table if (isEmpty == null) { - lookupAndProcessPath(work, path, allowed); + if (pool != null) { + ForkJoinTask<Void> task = pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { Review comment: remove exception? lookupAndProcesPath doesn't throw Exception. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java ########## @@ -159,21 +165,46 @@ private void processTableScans(MapWork work, Set<TableScanOperator> tableScans) } else { managedEmptyPathMap.put(path, false); } + } else { + numberOfUnmanagedPaths++; } } } + int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, (int)Math.sqrt(numberOfUnmanagedPaths)); + ForkJoinPool pool = null; + if (fetcherPoolParallelism > 1) { // dont create thread pool for a few partitions + pool = new ForkJoinPool(fetcherPoolParallelism); + } + List<ForkJoinTask<Void>> tasks = new ArrayList<>(numberOfUnmanagedPaths); for (Entry<Path, Collection<String>> entry : candidatePathsToAliases.entrySet()) { Path path = entry.getKey(); Collection<String> allowed = entry.getValue(); Boolean isEmpty = managedEmptyPathMap.get(path); // if isEmpty is null, either stats are not up to date or this is external table if (isEmpty == null) { - lookupAndProcessPath(work, path, allowed); + if (pool != null) { + ForkJoinTask<Void> task = pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + lookupAndProcessPath(work, path, allowed); + return null; + } + }); + tasks.add(task); + } else { + lookupAndProcessPath(work, path, allowed); + } } else { processPath(work, path, allowed, isEmpty); } } + for (ForkJoinTask<Void> t : tasks) { Review comment: With regular execservice, it would be to Future::get. In case of exceptions, other futures have to be cancelled. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java ########## @@ -159,21 +165,46 @@ private void processTableScans(MapWork work, Set<TableScanOperator> tableScans) } else { managedEmptyPathMap.put(path, false); } + } else { + numberOfUnmanagedPaths++; } } } + int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, (int)Math.sqrt(numberOfUnmanagedPaths)); + ForkJoinPool pool = null; + if (fetcherPoolParallelism > 1) { // dont create thread pool for a few partitions Review comment: When HIVE_COMPUTE_SPLITS_NUM_THREADS is used, setting it to 1 would automatically have single thread behaviour. If so, it wouldn't be necessary to do this check? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 512794) Time Spent: 20m (was: 10m) > NullScanTaskDispatcher should liststatus in parallel > ---------------------------------------------------- > > Key: HIVE-24380 > URL: https://issues.apache.org/jira/browse/HIVE-24380 > Project: Hive > Issue Type: Sub-task > Reporter: Mustafa İman > Assignee: Mustafa İman > Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > NullScanTaskDispatcher does listStatus for hundreds of partition directories > in case of external tables. This is big problem in cloud installations where > directory listings are in object store like S3. We can do this in parallel. -- This message was sent by Atlassian Jira (v8.3.4#803005)