Hi, Ning
Just to be clear on what I was suggesting, I have created a patch only for
this file.
Please have a look.
Thanks,
MIS.
On Thu, Mar 3, 2011 at 5:50 PM, M IS <[email protected]> wrote:
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/460/
>
> trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java<https://reviews.apache.org/r/460/diff/1/?file=13550#file13550line82>
> (Diff
> revision 1)
>
> public void run(SessionState sess, Set<ReadEntity> inputs,
>
> 77
>
> Thread[] threads = new Thread[nThreads];
>
> How about going for a centralized thread pool and submitting the tasks for
> that pool.
> This can have advantages like, we need not have to create threads and we
> could come to know of the status of the task submitted through the future
> object. And use this future to to wait till the task is finished. We can re
> factor the code to make UpdateWorker to implement Runnable instead of
> extending of Thread.
>
>
> - M
>
> On March 3rd, 2011, 12:53 a.m., Ning Zhang wrote:
> Review request for hive.
> By Ning Zhang.
>
> *Updated 2011-03-03 00:53:49*
> Description
>
> define hive.hooks.parallel.degree to control max # of thread to update
> metastore in parallel.
>
> Diffs
>
> - trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
> (1076459)
> - trunk/conf/hive-default.xml (1076459)
> -
> trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
> (1076459)
> -
> trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java
> (1076459)
>
> View Diff <https://reviews.apache.org/r/460/diff/>
>
### Eclipse Workspace Patch 1.0
#P hive
Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java (revision 1076702)
+++ ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java (working copy)
@@ -17,18 +17,26 @@
*/
package org.apache.hadoop.hive.ql.hooks;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Set;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+
/**
* Implementation of a pre execute hook that updates the access
* times for all the inputs.
@@ -39,7 +47,6 @@
public static class PreExec implements PreExecute {
Hive db;
-
public void run(SessionState sess, Set<ReadEntity> inputs,
Set<WriteEntity> outputs, UserGroupInformation ugi)
throws Exception {
@@ -54,35 +61,122 @@
}
}
+ if (inputs.size() == 0) {
+ return;
+ }
+
int lastAccessTime = (int) (System.currentTimeMillis()/1000);
- for(ReadEntity re: inputs) {
- // Set the last query time
+ int nThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.HOOKS_PARALLEL_DEGREE);
+ int maxThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
+
+ if (nThreads < 1) {
+ nThreads = 1;
+ } else if (nThreads > maxThreads) {
+ nThreads = maxThreads;
+ }
+ if (nThreads > inputs.size()) {
+ nThreads = inputs.size();
+ }
+
+ // This can be a rather common/centrally used thread pool.
+ ExecutorService exeService = new ThreadPoolExecutor(nThreads, nThreads, 5000, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ List<Future<?>> futures = new ArrayList<Future<?>>(nThreads);
+
+ List<ReadEntity>[] threadInputs = new List[nThreads];
+
+ // assign ReadEntities to threads
+ int i = 0;
+ for (i = 0; i < nThreads; ++i) {
+ threadInputs[i] = new ArrayList<ReadEntity>();
+ }
+
+ i = 0;
+ for (ReadEntity re: inputs) {
+ threadInputs[i % nThreads].add(re);
+ ++i;
+ }
+
+ try {
+ // launch all threads
+ Runnable updateWorker;
+ Future<?> futureTask;
+ for (i = 0; i < nThreads; ++i) {
+ updateWorker = new UpdateWorker(sess.getConf(), threadInputs[i], lastAccessTime);
+ futureTask = exeService.submit(updateWorker);
+ futures.add(futureTask);
+ }
+
+ // wait for all the tasks to finish
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ } catch (Throwable e) {
+ // wrap the RuntimeException thrown from threads
+ throw new HiveException(e);
+ } finally {
+ exeService.shutdown();
+ }
+ }
+ }
+
+ public static class UpdateWorker implements Runnable {
+ List<ReadEntity> reList;
+ // each thread must have a different Hive object
+ Hive db;
+ int lastAccessTime;
+ // keep track of which table's lastAccessTime has been updated
+ static Set<String> updatedTables = Collections.synchronizedSet(new HashSet<String>());
+
+ public UpdateWorker(HiveConf conf, List<ReadEntity> ent, int time)
+ throws HiveException {
+ super();
+ this.reList = ent;
+ this.db = Hive.get(conf); // get a thread local object
+ this.lastAccessTime = time;
+ }
+
+ @Override
+ public void run() {
+ for (ReadEntity re: reList) {
ReadEntity.Type typ = re.getType();
+ String tblName = null;
+ try {
switch(typ) {
// It is possible that read and write entities contain a old version
// of the object, before it was modified by StatsTask.
// Get the latest versions of the object
- case TABLE: {
- Table t = db.getTable(re.getTable().getTableName());
- t.setLastAccessTime(lastAccessTime);
- db.alterTable(t.getTableName(), t);
- break;
- }
case PARTITION: {
Partition p = re.getPartition();
- Table t = db.getTable(p.getTable().getTableName());
- p = db.getPartition(t, p.getSpec(), false);
+ tblName = p.getTable().getTableName();
p.setLastAccessTime(lastAccessTime);
- db.alterPartition(t.getTableName(), p);
- t.setLastAccessTime(lastAccessTime);
- db.alterTable(t.getTableName(), t);
+ db.alterPartition(tblName, p);
+ if (updatedTables.contains(tblName)) {
+ break;
+ }
+ // fall through to update table
+ }
+ case TABLE: {
+ if (tblName == null) {
+ tblName = re.getTable().getTableName();
+ }
+ if (!updatedTables.contains(tblName)) {
+ updatedTables.add(tblName);
+ Table t = db.getTable(tblName);
+ t.setLastAccessTime(lastAccessTime);
+ db.alterTable(tblName, t);
+ }
break;
}
default:
// ignore dummy inputs
break;
}
+ } catch(Exception e) {
+ // fail the hook if any update failed.
+ throw new RuntimeException(e);
+ }
}
}
}