[ https://issues.apache.org/jira/browse/HIVE-20627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sankar Hariappan reassigned HIVE-20627: --------------------------------------- > Concurrent Async queries from same session intermittently fails with > LockException. > ----------------------------------------------------------------------------------- > > Key: HIVE-20627 > URL: https://issues.apache.org/jira/browse/HIVE-20627 > Project: Hive > Issue Type: Bug > Components: HiveServer2 > Affects Versions: 4.0.0, 3.2.0 > Reporter: Sankar Hariappan > Assignee: Sankar Hariappan > Priority: Major > > When multiple async queries are executed from same session, it leads to > multiple async query execution DAGs share the same Hive object which is set > by caller for all threads. In case of loading dynamic partitions, it creates > MoveTask which re-creates the Hive object and closes the shared Hive object > which causes metastore connection issues for other async execution thread who > still access it. This is also seen if ReplDumpTask and ReplLoadTask are part > of the DAG. > *Root cause:* > For Async query execution from SQLOperation.runInternal, we set the Thread > local Hive object for all the child threads as parentHive > (parentSession.getSessionHive()) > {code} > @Override > public void run() { > PrivilegedExceptionAction<Object> doAsAction = new > PrivilegedExceptionAction<Object>() { > @Override > public Object run() throws HiveSQLException { > Hive.set(parentHive); // Setting parentHive for all async operations. > // TODO: can this result in cross-thread reuse of session state? > SessionState.setCurrentSessionState(parentSessionState); > PerfLogger.setPerfLogger(parentPerfLogger); > LogUtils.registerLoggingContext(queryState.getConf()); > try { > if (asyncPrepare) { > prepare(queryState); > } > runQuery(); > } catch (HiveSQLException e) { > // TODO: why do we invent our own error path op top of the one from > Future.get? > setOperationException(e); > LOG.error("Error running hive query: ", e); > } finally { > LogUtils.unregisterLoggingContext(); > } > return null; > } > }; > {code} > Now, when async execution in progress and if one of the thread re-creates the > Hive object, it closes the parentHive object first which impacts other > threads using it and hence conf object it refers too gets cleaned up and > hence we get null for VALID_TXNS_KEY value. > {code} > private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean > doRegisterAllFns) > throws HiveException { > if (db != null) { > LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh > + > ", db.isCurrentUserOwner = " + db.isCurrentUserOwner()); > db.close(); > } > closeCurrent(); > if (c == null) { > c = createHiveConf(); > } > c.set("fs.scheme.class", "dfs"); > Hive newdb = new Hive(c, doRegisterAllFns); > hiveDB.set(newdb); > return newdb; > } > {code} > *Fix:* > We shouldn't clean the old Hive object if it is shared by multiple threads. > Shall use a flag to know this. > *Memory leak issue:* > Memory leak is found if one of the threads from Hive.loadDynamicPartitions > throw exception. rawStoreMap is used to store rawStore objects which has to > be cleaned. In this case, it is populated only in success flow but if there > are exceptions, it is not and hence there is a leak. > {code} > futures.add(pool.submit(new Callable<Void>() { > @Override > public Void call() throws Exception { > try { > // move file would require session details (needCopy() invokes > SessionState.get) > SessionState.setCurrentSessionState(parentSession); > LOG.info("New loading path = " + partPath + " with partSpec " + > fullPartSpec); > // load the partition > Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, > loadFileType, > true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, > stmtId, > isInsertOverwrite); > partitionsMap.put(fullPartSpec, newPartition); > if (inPlaceEligible) { > synchronized (ps) { > InPlaceUpdate.rePositionCursor(ps); > partitionsLoaded.incrementAndGet(); > InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" > + partsToLoad + " partitions."); > } > } > // Add embedded rawstore, so we can cleanup later to avoid memory leak > if (getMSC().isLocalMetaStore()) { > if (!rawStoreMap.containsKey(Thread.currentThread().getId())) { > rawStoreMap.put(Thread.currentThread().getId(), > HiveMetaStore.HMSHandler.getRawStore()); > } > } > return null; > } catch (Exception t) { > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)