[ 
https://issues.apache.org/jira/browse/HIVE-24380?focusedWorklogId=512794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-512794
 ]

ASF GitHub Bot logged work on HIVE-24380:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Nov/20 09:26
            Start Date: 17/Nov/20 09:26
    Worklog Time Spent: 10m 
      Work Description: rbalamohan commented on a change in pull request #1670:
URL: https://github.com/apache/hive/pull/1670#discussion_r525001043



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
##########
@@ -159,21 +165,46 @@ private void processTableScans(MapWork work, 
Set<TableScanOperator> tableScans)
           } else {
             managedEmptyPathMap.put(path, false);
           }
+        } else {
+          numberOfUnmanagedPaths++;
         }
       }
     }
 
+    int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, 
(int)Math.sqrt(numberOfUnmanagedPaths));

Review comment:
       Would be good to reuse HIVE_COMPUTE_SPLITS_NUM_THREADS instead of 
MAXIMUM_ASYNC_STATUS_FETCHER_COUNT. On need basis, it can be set to "1" to get 
back old behaviour. 

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
##########
@@ -159,21 +165,46 @@ private void processTableScans(MapWork work, 
Set<TableScanOperator> tableScans)
           } else {
             managedEmptyPathMap.put(path, false);
           }
+        } else {
+          numberOfUnmanagedPaths++;
         }
       }
     }
 
+    int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, 
(int)Math.sqrt(numberOfUnmanagedPaths));
+    ForkJoinPool pool = null;

Review comment:
       Can use regular ExecutorService since it is not doing something very 
similar to ForkJoin. 

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
##########
@@ -159,21 +165,46 @@ private void processTableScans(MapWork work, 
Set<TableScanOperator> tableScans)
           } else {
             managedEmptyPathMap.put(path, false);
           }
+        } else {
+          numberOfUnmanagedPaths++;
         }
       }
     }
 
+    int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, 
(int)Math.sqrt(numberOfUnmanagedPaths));
+    ForkJoinPool pool = null;
+    if (fetcherPoolParallelism > 1) { // dont create thread pool for a few 
partitions
+      pool = new ForkJoinPool(fetcherPoolParallelism);
+    }
+    List<ForkJoinTask<Void>> tasks = new ArrayList<>(numberOfUnmanagedPaths);
     for (Entry<Path, Collection<String>> entry : 
candidatePathsToAliases.entrySet()) {
       Path path = entry.getKey();
       Collection<String> allowed = entry.getValue();
       Boolean isEmpty = managedEmptyPathMap.get(path);
       // if isEmpty is null, either stats are not up to date or this is 
external table
       if (isEmpty == null) {
-        lookupAndProcessPath(work, path, allowed);
+        if (pool != null) {
+          ForkJoinTask<Void> task = pool.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {

Review comment:
       remove exception? lookupAndProcesPath doesn't throw Exception. 

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
##########
@@ -159,21 +165,46 @@ private void processTableScans(MapWork work, 
Set<TableScanOperator> tableScans)
           } else {
             managedEmptyPathMap.put(path, false);
           }
+        } else {
+          numberOfUnmanagedPaths++;
         }
       }
     }
 
+    int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, 
(int)Math.sqrt(numberOfUnmanagedPaths));
+    ForkJoinPool pool = null;
+    if (fetcherPoolParallelism > 1) { // dont create thread pool for a few 
partitions
+      pool = new ForkJoinPool(fetcherPoolParallelism);
+    }
+    List<ForkJoinTask<Void>> tasks = new ArrayList<>(numberOfUnmanagedPaths);
     for (Entry<Path, Collection<String>> entry : 
candidatePathsToAliases.entrySet()) {
       Path path = entry.getKey();
       Collection<String> allowed = entry.getValue();
       Boolean isEmpty = managedEmptyPathMap.get(path);
       // if isEmpty is null, either stats are not up to date or this is 
external table
       if (isEmpty == null) {
-        lookupAndProcessPath(work, path, allowed);
+        if (pool != null) {
+          ForkJoinTask<Void> task = pool.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+              lookupAndProcessPath(work, path, allowed);
+              return null;
+            }
+          });
+          tasks.add(task);
+        } else {
+          lookupAndProcessPath(work, path, allowed);
+        }
       } else {
         processPath(work, path, allowed, isEmpty);
       }
     }
+    for (ForkJoinTask<Void> t : tasks) {

Review comment:
       With regular execservice, it would be to Future::get. In case of 
exceptions, other futures have to be cancelled.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
##########
@@ -159,21 +165,46 @@ private void processTableScans(MapWork work, 
Set<TableScanOperator> tableScans)
           } else {
             managedEmptyPathMap.put(path, false);
           }
+        } else {
+          numberOfUnmanagedPaths++;
         }
       }
     }
 
+    int fetcherPoolParallelism = Math.min(MAXIMUM_ASYNC_STATUS_FETCHER_COUNT, 
(int)Math.sqrt(numberOfUnmanagedPaths));
+    ForkJoinPool pool = null;
+    if (fetcherPoolParallelism > 1) { // dont create thread pool for a few 
partitions

Review comment:
       When HIVE_COMPUTE_SPLITS_NUM_THREADS is used, setting it to 1 would 
automatically have single thread behaviour. If so, it wouldn't be necessary to 
do this check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 512794)
    Time Spent: 20m  (was: 10m)

> NullScanTaskDispatcher should liststatus in parallel
> ----------------------------------------------------
>
>                 Key: HIVE-24380
>                 URL: https://issues.apache.org/jira/browse/HIVE-24380
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Mustafa İman
>            Assignee: Mustafa İman
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> NullScanTaskDispatcher does listStatus for hundreds of partition directories 
> in case of external tables. This is big problem in cloud installations where 
> directory listings are in object store like S3. We can do this in parallel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to