lirui-apache commented on a change in pull request #15653: URL: https://github.com/apache/flink/pull/15653#discussion_r634870074
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java ########## @@ -439,4 +439,8 @@ public void close() throws Exception { } } } + Review comment: Add `@VisibleForTesting` annotation ########## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java ########## @@ -66,6 +71,36 @@ public static void setup() { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } + @Test + public void testHiveTableSourceJobConfWithCredentials() throws Exception { + final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); + final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace"); + Credentials credentials = new Credentials(); + credentials.addToken( + hdfsDelegationTokenService, + new Token<>( + new byte[4], + new byte[4], + hdfsDelegationTokenKind, + hdfsDelegationTokenService)); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + + tableEnv.executeSql( + String.format( + "create table table2 (x int, y string, z int) partitioned by (" + + " pt_year int, pt_mon string, pt_day string)" + + " tblproperties ('%s' = 'true')", + STREAMING_SOURCE_ENABLE.key())); + DynamicTableSource tableSource1 = getTableSource("table2"); + assertFalse(tableSource1 instanceof HiveLookupTableSource); + HiveTableSource tableSource = (HiveTableSource) tableSource1; + + Token token = tableSource.getJobConf().getCredentials().getToken(hdfsDelegationTokenService); + assertNull(token); Review comment: Why do we expect the token to be null? ########## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java ########## @@ -66,6 +71,36 @@ public static void setup() { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } + @Test + public void testHiveTableSourceJobConfWithCredentials() throws Exception { + final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); + final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace"); + Credentials credentials = new Credentials(); + credentials.addToken( + hdfsDelegationTokenService, + new Token<>( + new byte[4], + new byte[4], + hdfsDelegationTokenKind, + hdfsDelegationTokenService)); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + + tableEnv.executeSql( + String.format( + "create table table2 (x int, y string, z int) partitioned by (" + + " pt_year int, pt_mon string, pt_day string)" + + " tblproperties ('%s' = 'true')", + STREAMING_SOURCE_ENABLE.key())); Review comment: Why do we need to enable streaming source? ########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/JobConfUtils.java ########## @@ -57,4 +57,10 @@ public static void addCredentialsIntoJobConf(JobConf jobConf) { "Unable to add credentials into job conf when existing credentials in UGI.", e); } } + + public static JobConf createJobConfWithCredentials(HiveConf hiveConf) { Review comment: The parameter can be just `Configuration` instead of `HiveConf` ########## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java ########## @@ -66,6 +71,36 @@ public static void setup() { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } + @Test + public void testHiveTableSourceJobConfWithCredentials() throws Exception { + final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); + final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace"); + Credentials credentials = new Credentials(); + credentials.addToken( + hdfsDelegationTokenService, + new Token<>( + new byte[4], + new byte[4], + hdfsDelegationTokenKind, + hdfsDelegationTokenService)); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + + tableEnv.executeSql( + String.format( + "create table table2 (x int, y string, z int) partitioned by (" + + " pt_year int, pt_mon string, pt_day string)" + + " tblproperties ('%s' = 'true')", + STREAMING_SOURCE_ENABLE.key())); + DynamicTableSource tableSource1 = getTableSource("table2"); + assertFalse(tableSource1 instanceof HiveLookupTableSource); Review comment: This is irrelevant to this test ########## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java ########## @@ -66,6 +71,36 @@ public static void setup() { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } + @Test + public void testHiveTableSourceJobConfWithCredentials() throws Exception { Review comment: We inject the token for both source and sink. So let's test for both of them too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org