zuston commented on a change in pull request #15653: URL: https://github.com/apache/flink/pull/15653#discussion_r641307142
########## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java ########## @@ -66,6 +71,36 @@ public static void setup() { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } + @Test + public void testHiveTableSourceJobConfWithCredentials() throws Exception { + final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); + final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace"); + Credentials credentials = new Credentials(); + credentials.addToken( + hdfsDelegationTokenService, + new Token<>( + new byte[4], + new byte[4], + hdfsDelegationTokenKind, + hdfsDelegationTokenService)); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + + tableEnv.executeSql( + String.format( + "create table table2 (x int, y string, z int) partitioned by (" + + " pt_year int, pt_mon string, pt_day string)" + + " tblproperties ('%s' = 'true')", + STREAMING_SOURCE_ENABLE.key())); + DynamicTableSource tableSource1 = getTableSource("table2"); + assertFalse(tableSource1 instanceof HiveLookupTableSource); Review comment: Removed. ########## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java ########## @@ -66,6 +71,36 @@ public static void setup() { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } + @Test + public void testHiveTableSourceJobConfWithCredentials() throws Exception { + final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); + final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace"); + Credentials credentials = new Credentials(); + credentials.addToken( + hdfsDelegationTokenService, + new Token<>( + new byte[4], + new byte[4], + hdfsDelegationTokenKind, + hdfsDelegationTokenService)); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + + tableEnv.executeSql( + String.format( + "create table table2 (x int, y string, z int) partitioned by (" + + " pt_year int, pt_mon string, pt_day string)" + + " tblproperties ('%s' = 'true')", + STREAMING_SOURCE_ENABLE.key())); Review comment: Removed. ########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/JobConfUtils.java ########## @@ -57,4 +57,10 @@ public static void addCredentialsIntoJobConf(JobConf jobConf) { "Unable to add credentials into job conf when existing credentials in UGI.", e); } } + + public static JobConf createJobConfWithCredentials(HiveConf hiveConf) { Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org