This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new da661b9b293 [fix](multi-catalog)access HMS need ugiDoAs (#30595)
da661b9b293 is described below
commit da661b9b293faa7c8dac2ff25685c3998f5cedd0
Author: slothever <[email protected]>
AuthorDate: Thu Feb 1 19:06:45 2024 +0800
[fix](multi-catalog)access HMS need ugiDoAs (#30595)
---
.../datasource/hive/ThriftHMSCachedClient.java | 81 ++++++++++++----------
1 file changed, 46 insertions(+), 35 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index abb3fda24b3..1e5e2f1287a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -18,6 +18,7 @@
package org.apache.doris.datasource.hive;
import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.HMSClientException;
import
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.security.PrivilegedExceptionAction;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
@@ -93,7 +95,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient
{
public List<String> getAllDatabases() {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getAllDatabases();
+ return ugiDoAs(client.client::getAllDatabases);
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -107,7 +109,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public List<String> getAllTables(String dbName) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getAllTables(dbName);
+ return ugiDoAs(() -> client.client.getAllTables(dbName));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -121,7 +123,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public boolean tableExists(String dbName, String tblName) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.tableExists(dbName, tblName);
+ return ugiDoAs(() -> client.client.tableExists(dbName,
tblName));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -142,7 +144,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
short limited = maxListPartitionNum <= Short.MAX_VALUE ? (short)
maxListPartitionNum : MAX_LIST_PARTITION_NUM;
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.listPartitionNames(dbName, tblName,
limited);
+ return ugiDoAs(() -> client.client.listPartitionNames(dbName,
tblName, limited));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -156,7 +158,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public Partition getPartition(String dbName, String tblName, List<String>
partitionValues) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getPartition(dbName, tblName,
partitionValues);
+ return ugiDoAs(() -> client.client.getPartition(dbName,
tblName, partitionValues));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -171,7 +173,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public List<Partition> getPartitions(String dbName, String tblName,
List<String> partitionNames) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getPartitionsByNames(dbName, tblName,
partitionNames);
+ return ugiDoAs(() ->
client.client.getPartitionsByNames(dbName, tblName, partitionNames));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -186,7 +188,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public Database getDatabase(String dbName) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getDatabase(dbName);
+ return ugiDoAs(() -> client.client.getDatabase(dbName));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -200,7 +202,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public Table getTable(String dbName, String tblName) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getTable(dbName, tblName);
+ return ugiDoAs(() -> client.client.getTable(dbName, tblName));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -214,7 +216,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public List<FieldSchema> getSchema(String dbName, String tblName) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getSchema(dbName, tblName);
+ return ugiDoAs(() -> client.client.getSchema(dbName, tblName));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -228,7 +230,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName,
String tblName, List<String> columns) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getTableColumnStatistics(dbName, tblName,
columns);
+ return ugiDoAs(() ->
client.client.getTableColumnStatistics(dbName, tblName, columns));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -243,7 +245,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
String dbName, String tblName, List<String> partNames,
List<String> columns) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getPartitionColumnStatistics(dbName,
tblName, partNames, columns);
+ return ugiDoAs(() ->
client.client.getPartitionColumnStatistics(dbName, tblName, partNames,
columns));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -257,7 +259,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public CurrentNotificationEventId getCurrentNotificationEventId() {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getCurrentNotificationEventId();
+ return ugiDoAs(client.client::getCurrentNotificationEventId);
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -271,12 +273,12 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
@Override
public NotificationEventResponse getNextNotification(long lastEventId,
- int maxEvents,
- IMetaStoreClient.NotificationFilter filter)
+ int maxEvents,
+
IMetaStoreClient.NotificationFilter filter)
throws MetastoreNotificationFetchException {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.getNextNotification(lastEventId,
maxEvents, filter);
+ return ugiDoAs(() ->
client.client.getNextNotification(lastEventId, maxEvents, filter));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -293,7 +295,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public long openTxn(String user) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.openTxn(user);
+ return ugiDoAs(() -> client.client.openTxn(user));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -307,7 +309,10 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public void commitTxn(long txnId) {
try (ThriftHMSClient client = getClient()) {
try {
- client.client.commitTxn(txnId);
+ ugiDoAs(() -> {
+ client.client.commitTxn(txnId);
+ return null;
+ });
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -319,7 +324,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
@Override
public void acquireSharedLock(String queryId, long txnId, String user,
TableName tblName,
- List<String> partitionNames, long timeoutMs) {
+ List<String> partitionNames, long timeoutMs)
{
LockRequestBuilder request = new
LockRequestBuilder(queryId).setTransactionId(txnId).setUser(user);
List<LockComponent> lockComponents =
createLockComponentsForRead(tblName, partitionNames);
for (LockComponent component : lockComponents) {
@@ -328,7 +333,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
try (ThriftHMSClient client = getClient()) {
LockResponse response;
try {
- response = client.client.lock(request.build());
+ response = ugiDoAs(() -> client.client.lock(request.build()));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -356,20 +361,22 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
public ValidWriteIdList getValidWriteIds(String fullTableName, long
currentTransactionId) {
try (ThriftHMSClient client = getClient()) {
try {
- // Pass currentTxn as 0L to get the recent snapshot of valid
transactions in Hive
- // Do not pass currentTransactionId instead as
- // it will break Hive's listing of delta directories if major
compaction
- // deletes delta directories for valid transactions that
existed at the time transaction is opened
- ValidTxnList validTransactions = client.client.getValidTxns();
- List<TableValidWriteIds> tableValidWriteIdsList =
client.client.getValidWriteIds(
- Collections.singletonList(fullTableName),
validTransactions.toString());
- if (tableValidWriteIdsList.size() != 1) {
- throw new Exception("tableValidWriteIdsList's size should
be 1");
- }
- ValidTxnWriteIdList validTxnWriteIdList =
TxnUtils.createValidTxnWriteIdList(currentTransactionId,
- tableValidWriteIdsList);
- ValidWriteIdList writeIdList =
validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
- return writeIdList;
+ return ugiDoAs(() -> {
+ // Pass currentTxn as 0L to get the recent snapshot of
valid transactions in Hive
+ // Do not pass currentTransactionId instead as
+ // it will break Hive's listing of delta directories if
major compaction
+ // deletes delta directories for valid transactions that
existed at the time transaction is opened
+ ValidTxnList validTransactions =
client.client.getValidTxns();
+ List<TableValidWriteIds> tableValidWriteIdsList =
client.client.getValidWriteIds(
+ Collections.singletonList(fullTableName),
validTransactions.toString());
+ if (tableValidWriteIdsList.size() != 1) {
+ throw new Exception("tableValidWriteIdsList's size
should be 1");
+ }
+ ValidTxnWriteIdList validTxnWriteIdList =
TxnUtils.createValidTxnWriteIdList(currentTransactionId,
+ tableValidWriteIdsList);
+ ValidWriteIdList writeIdList =
validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
+ return writeIdList;
+ });
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -385,7 +392,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
private LockResponse checkLock(long lockId) {
try (ThriftHMSClient client = getClient()) {
try {
- return client.client.checkLock(lockId);
+ return ugiDoAs(() -> client.client.checkLock(lockId));
} catch (Exception e) {
client.setThrowable(e);
throw e;
@@ -460,7 +467,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
synchronized (clientPool) {
ThriftHMSClient client = clientPool.poll();
if (client == null) {
- return new ThriftHMSClient(hiveConf);
+ return ugiDoAs(() -> new ThriftHMSClient(hiveConf));
}
return client;
}
@@ -468,5 +475,9 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
+
+ private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) {
+ return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]