As Yangze stated ticket cache will be expired after its lifespan. Please be aware that when keytab is used then Flink obtains delegation tokens which will be never ever used. The fact that delegation token handling is not functioning is a known issue and working on it to fix it. w/o delegation tokens 200+ node clusters just collapse because KDC can't serve such amount of requests. Working on a FLIP to make it work...
BR, G On Tue, Jul 6, 2021 at 5:33 AM Yangze Guo <karma...@gmail.com> wrote: > The ticket cache will be expired after its lifespan. You can try to > set the security.kerberos.login.use-ticket-cache to false as you > provide the keytab. > > Best, > Yangze Guo > > On Tue, Jul 6, 2021 at 10:02 AM 谢扬成 <yangcheng1987...@126.com> wrote: > > > > Hi, > > > > I processed data with flink which version is 1.12.2, data source read > from kafka, after logic processing, then write into HDFS with parquet > format, the Hadoop cluster opened kerberos authentication mechanism. > > flink-conf.yml like below: > > security.kerberos.login.use-ticket-cache: true > > security.kerberos.login.keytab: /var/kerberos/krb5/user/infa.keytab > > security.kerberos.login.principal: infa > > ticket expires after 24 hours and the renew config is false. > > > > The flink job runs normally and correctly, but after 24 hours, it throws > exception which is about token expires or something, the exception stack is > below: > > 2021-06-21 07:57:29,124 WARN org.apache.hadoop.ipc.Client > [] - Exception encountered while connecting to the > server : javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > 2021-06-21 07:57:29,125 INFO > com.citics.flink.parquet.sink.ParquetBulkSink [] - > activeNameNode->null > > 2021-06-21 07:57:29,127 WARN org.apache.hadoop.ipc.Client > [] - Exception encountered while connecting to the > server : javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > 2021-06-21 07:57:29,128 INFO > com.citics.flink.parquet.sink.ParquetBulkSink [] - > realDataPath->hdfs://nameservice1/warehouse/tablespace/external/hive/edm_realtime.db/rtdw_dwd_hma_records_part > > 2021-06-21 07:57:29,138 WARN org.apache.hadoop.ipc.Client > [] - Exception encountered while connecting to the > server : javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > 2021-06-21 07:57:29,139 INFO > com.citics.flink.parquet.sink.ParquetBulkSink [] - > activeNameNode: null > > 2021-06-21 07:57:29,140 WARN org.apache.hadoop.ipc.Client > [] - Exception encountered while connecting to the > server : javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > 2021-06-21 07:57:29,141 WARN org.apache.hadoop.ipc.Client > [] - Exception encountered while connecting to the > server : javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > 2021-06-21 07:57:29,142 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: Custom Source -> Map -> Sink: Parquet Sink > (1/1)#155020 (42bffc050e7ed72ca555c7cd92505404) switched from RUNNING to > FAILED. > > java.io.IOException: Committing during recovery failed: Could not access > status of source file. > > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commitAfterRecovery(HadoopRecoverableFsDataOutputStream.java:281) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:218) > ~[flink-table-blink_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:472) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > com.citics.flink.parquet.sink.ParquetBulkSink.initializeState(ParquetBulkSink.java:125) > ~[tracking-etl-hma-1.0.0.jar:?] > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > [flink-dist_2.11-1.12.2.jar:1.12.2] > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > [flink-dist_2.11-1.12.2.jar:1.12.2] > > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232] > > > > Is there any resolution for this exception ? > > > > > > >