[ https://issues.apache.org/jira/browse/FLINK-22329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Junfan Zhang updated FLINK-22329: --------------------------------- Summary: Missing credentials in jobconf causes repeated authentication in Hive datasource (was: Missing crendentials in jobconf causes repeated authentication in Hive datasource) > Missing credentials in jobconf causes repeated authentication in Hive > datasource > -------------------------------------------------------------------------------- > > Key: FLINK-22329 > URL: https://issues.apache.org/jira/browse/FLINK-22329 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Reporter: Junfan Zhang > Priority: Major > Labels: pull-request-available > > Related Flink code: > [https://github.com/apache/flink/blob/577113f0c339df844f2cc32b1d4a09d3da28085a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java#L107] > > In this {{getSplits}} method, it will call hadoop {{FileInputFormat's > getSplits}} method. related hadoop code is > [here|https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L426]. > Simple code is as follows > {code:java} > // Hadoop FileInputFormat > public InputSplit[] getSplits(JobConf job, int numSplits) > throws IOException { > StopWatch sw = new StopWatch().start(); > FileStatus[] stats = listStatus(job); > > ...... > } > protected FileStatus[] listStatus(JobConf job) throws IOException { > Path[] dirs = getInputPaths(job); > if (dirs.length == 0) { > throw new IOException("No input paths specified in job"); > } > // get tokens for all the required FileSystems.. > TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); > > // Whether we need to recursive look into the directory structure > ...... > } > {code} > > In {{listStatus}} method, it will obtain delegation tokens by calling > {{TokenCache.obtainTokensForNamenodes}} method. Howerver this method will > give up to get delegation tokens when credentials in jobconf. > So it's neccessary to inject current ugi credentials into jobconf. > > Besides, when Flink support delegation tokens directly without keytab([refer > to this PR|https://issues.apache.org/jira/browse/FLINK-21700]), > {{TokenCache.obtainTokensForNamenodes}} will failed without this patch > because of no corresponding credentials. > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)