Hi,

I processed data with flink which version is 1.12.2, data source read from 
kafka, after logic processing, then write into HDFS with parquet format, the 
Hadoop cluster opened kerberos authentication mechanism.
flink-conf.yml like below:
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /var/kerberos/krb5/user/infa.keytab
security.kerberos.login.principal: infa
ticket expires after 24 hours and the renew config is false.


The flink job runs normally and correctly, but after 24 hours, it throws 
exception which is about token expires or something, the exception stack is 
below:
2021-06-21 07:57:29,124 WARN  org.apache.hadoop.ipc.Client                      
           [] - Exception encountered while connecting to the server : 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
2021-06-21 07:57:29,125 INFO  com.citics.flink.parquet.sink.ParquetBulkSink     
           [] - activeNameNode->null
2021-06-21 07:57:29,127 WARN  org.apache.hadoop.ipc.Client                      
           [] - Exception encountered while connecting to the server : 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
2021-06-21 07:57:29,128 INFO  com.citics.flink.parquet.sink.ParquetBulkSink     
           [] - 
realDataPath->hdfs://nameservice1/warehouse/tablespace/external/hive/edm_realtime.db/rtdw_dwd_hma_records_part
2021-06-21 07:57:29,138 WARN  org.apache.hadoop.ipc.Client                      
           [] - Exception encountered while connecting to the server : 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
2021-06-21 07:57:29,139 INFO  com.citics.flink.parquet.sink.ParquetBulkSink     
           [] - activeNameNode: null
2021-06-21 07:57:29,140 WARN  org.apache.hadoop.ipc.Client                      
           [] - Exception encountered while connecting to the server : 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
2021-06-21 07:57:29,141 WARN  org.apache.hadoop.ipc.Client                      
           [] - Exception encountered while connecting to the server : 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
2021-06-21 07:57:29,142 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Custom Source -> Map -> Sink: Parquet Sink (1/1)#155020 
(42bffc050e7ed72ca555c7cd92505404) switched from RUNNING to FAILED.
java.io.IOException: Committing during recovery failed: Could not access status 
of source file.
at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commitAfterRecovery(HadoopRecoverableFsDataOutputStream.java:281)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:218)
 ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:472)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
com.citics.flink.parquet.sink.ParquetBulkSink.initializeState(ParquetBulkSink.java:125)
 ~[tracking-etl-hma-1.0.0.jar:?]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
[flink-dist_2.11-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]


Is there any resolution for this exception ?

Reply via email to