[ https://issues.apache.org/jira/browse/HIVE-16951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vaibhav Gumashta updated HIVE-16951: ------------------------------------ Summary: ACID Compactor, PartialScanTask, MergeFileTask, ColumnTruncateTask, HCatUtil don't close JobClient (was: ACID: Compactor doesn't close JobClient) > ACID Compactor, PartialScanTask, MergeFileTask, ColumnTruncateTask, HCatUtil > don't close JobClient > -------------------------------------------------------------------------------------------------- > > Key: HIVE-16951 > URL: https://issues.apache.org/jira/browse/HIVE-16951 > Project: Hive > Issue Type: Bug > Components: Transactions > Affects Versions: 1.2.2, 2.1.1 > Reporter: Vaibhav Gumashta > Assignee: Vaibhav Gumashta > > When a compaction job is launched, we create a new JobClient everytime we run > the MR job: > {code} > private void launchCompactionJob(JobConf job, Path baseDir, CompactionType > compactionType, > StringableList dirsToSearch, > List<AcidUtils.ParsedDelta> parsedDeltas, > int curDirNumber, int obsoleteDirNumber, > HiveConf hiveConf, > TxnStore txnHandler, long id, String > jobName) throws IOException { > job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); > if(dirsToSearch == null) { > dirsToSearch = new StringableList(); > } > StringableList deltaDirs = new StringableList(); > long minTxn = Long.MAX_VALUE; > long maxTxn = Long.MIN_VALUE; > for (AcidUtils.ParsedDelta delta : parsedDeltas) { > LOG.debug("Adding delta " + delta.getPath() + " to directories to > search"); > dirsToSearch.add(delta.getPath()); > deltaDirs.add(delta.getPath()); > minTxn = Math.min(minTxn, delta.getMinTransaction()); > maxTxn = Math.max(maxTxn, delta.getMaxTransaction()); > } > if (baseDir != null) job.set(BASE_DIR, baseDir.toString()); > job.set(DELTA_DIRS, deltaDirs.toString()); > job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); > job.setLong(MIN_TXN, minTxn); > job.setLong(MAX_TXN, maxTxn); > if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { > mrJob = job; > } > LOG.info("Submitting " + compactionType + " compaction job '" + > job.getJobName() + "' to " + job.getQueueName() + " queue. " + > "(current delta dirs count=" + curDirNumber + > ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + > minTxn + "," + maxTxn + "]"); > RunningJob rj = new JobClient(job).submitJob(job); > LOG.info("Submitted compaction job '" + job.getJobName() + "' with > jobID=" + rj.getID() + " compaction ID=" + id); > txnHandler.setHadoopJobId(rj.getID().toString(), id); > rj.waitForCompletion(); > if (!rj.isSuccessful()) { > throw new IOException(compactionType == CompactionType.MAJOR ? "Major" > : "Minor" + > " compactor job failed for " + jobName + "! Hadoop JobId: " + > rj.getID() ); > } > } > {code} > We should close the JobClient to release resources (cached FS objects etc). -- This message was sent by Atlassian JIRA (v6.4.14#64029)