The ticket cache will be expired after its lifespan. You can try to
set the security.kerberos.login.use-ticket-cache to false as you
provide the keytab.

Best,
Yangze Guo

On Tue, Jul 6, 2021 at 10:02 AM 谢扬成 <yangcheng1987...@126.com> wrote:
>
> Hi,
>
> I processed data with flink which version is 1.12.2, data source read from 
> kafka, after logic processing, then write into HDFS with parquet format, the 
> Hadoop cluster opened kerberos authentication mechanism.
> flink-conf.yml like below:
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.keytab: /var/kerberos/krb5/user/infa.keytab
> security.kerberos.login.principal: infa
> ticket expires after 24 hours and the renew config is false.
>
> The flink job runs normally and correctly, but after 24 hours, it throws 
> exception which is about token expires or something, the exception stack is 
> below:
> 2021-06-21 07:57:29,124 WARN  org.apache.hadoop.ipc.Client                    
>              [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,125 INFO  com.citics.flink.parquet.sink.ParquetBulkSink   
>              [] - activeNameNode->null
> 2021-06-21 07:57:29,127 WARN  org.apache.hadoop.ipc.Client                    
>              [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,128 INFO  com.citics.flink.parquet.sink.ParquetBulkSink   
>              [] - 
> realDataPath->hdfs://nameservice1/warehouse/tablespace/external/hive/edm_realtime.db/rtdw_dwd_hma_records_part
> 2021-06-21 07:57:29,138 WARN  org.apache.hadoop.ipc.Client                    
>              [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,139 INFO  com.citics.flink.parquet.sink.ParquetBulkSink   
>              [] - activeNameNode: null
> 2021-06-21 07:57:29,140 WARN  org.apache.hadoop.ipc.Client                    
>              [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,141 WARN  org.apache.hadoop.ipc.Client                    
>              [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,142 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - Source: Custom Source -> Map -> Sink: Parquet Sink 
> (1/1)#155020 (42bffc050e7ed72ca555c7cd92505404) switched from RUNNING to 
> FAILED.
> java.io.IOException: Committing during recovery failed: Could not access 
> status of source file.
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commitAfterRecovery(HadoopRecoverableFsDataOutputStream.java:281)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:218)
>  ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:472)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> com.citics.flink.parquet.sink.ParquetBulkSink.initializeState(ParquetBulkSink.java:125)
>  ~[tracking-etl-hma-1.0.0.jar:?]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> [flink-dist_2.11-1.12.2.jar:1.12.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> [flink-dist_2.11-1.12.2.jar:1.12.2]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
>
> Is there any resolution for this exception ?
>
>
>

Reply via email to