Dear all:
      
????flink??????????????6????????????????????????????????????????
      ??????????kafka??hdfs??????????????100G
      ??????kafka????kerberos??????????????kerberos????
      ??????
       1.??????checkpoint??????

      
 2.??????????flink??????????????????kafka??????????????????/etc/krb5.conf??????????????hosts??????KDC????????????????????????????????????6????????????????????????????????????????

       3.??????????????????????????????????????????
2020-03-06 01:11:27,027 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Could not complete 
/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12673/b1992c81-7821-4d68-9c0b-1302b24c42d0
 retrying...
2020-03-06 01:11:33,457 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Could not complete 
/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12673/b1992c81-7821-4d68-9c0b-1302b24c42d0
 retrying...
2020-03-06 01:11:56,869 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Could not complete 
/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12674/58c158bf-0583-4c14-a6d4-2873d1b0d9ac
 retrying...
2020-03-06 01:12:03,270 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Could not complete 
/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12674/58c158bf-0583-4c14-a6d4-2873d1b0d9ac
 retrying...
2020-03-06 01:12:18,283 INFO  org.apache.flink.runtime.taskmanager.Task 
                    - 
Attempting to cancel task Source: Custom Source -> Sink: Unnamed (2/6) 
(90484955ec6ef7ff8d705142bbf6b5e0).
2020-03-06 01:12:18,283 INFO  org.apache.flink.runtime.taskmanager.Task 
                    - Source: 
Custom Source -> Sink: Unnamed (2/6) (90484955ec6ef7ff8d705142bbf6b5e0) 
switched from RUNNING to CANCELING.
2020-03-06 01:12:18,283 INFO  org.apache.flink.runtime.taskmanager.Task 
                    - 
Triggering cancellation of task code Source: Custom Source -> Sink: Unnamed 
(2/6) (90484955ec6ef7ff8d705142bbf6b5e0).
2020-03-06 01:12:18,442 WARN 
 org.apache.kafka.common.security.kerberos.KerberosLogin     
  - [[email protected]]: TGT renewal thread has been 
interrupted and will exit.
2020-03-06 01:12:18,443 INFO  org.apache.flink.runtime.taskmanager.Task 
                    - Source: 
Custom Source -> Sink: Unnamed (2/6) (90484955ec6ef7ff8d705142bbf6b5e0) 
switched from CANCELING to CANCELED.
2020-03-06 01:12:18,443 INFO  org.apache.flink.runtime.taskmanager.Task 
                    - Freeing 
task resources for Source: Custom Source -> Sink: Unnamed (2/6) 
(90484955ec6ef7ff8d705142bbf6b5e0).
2020-03-06 01:12:18,445 INFO  org.apache.flink.runtime.taskmanager.Task 
                    - 
Ensuring all FileSystem streams are closed for task Source: Custom Source -> 
Sink: Unnamed (2/6) (90484955ec6ef7ff8d705142bbf6b5e0) [CANCELED]
2020-03-06 01:12:18,447 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Un-registering task and sending final execution state 
CANCELED to JobManager for task Source: Custom Source -> Sink: Unnamed 
90484955ec6ef7ff8d705142bbf6b5e0.
2020-03-06 01:12:30,966 INFO 
 org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable     
 - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
networkMemoryInMB=2147483647, managedMemoryInMB=1935}, allocationId: 
13e1e5aaa46415eb80b67fda822519f5, jobId: ab806ce6e5b0dda5eae3139eb784ddb8).
2020-03-06 01:12:30,967 INFO 
 org.apache.flink.runtime.taskexecutor.JobLeaderService     
   - Remove job ab806ce6e5b0dda5eae3139eb784ddb8 from job leader 
monitoring.
2020-03-06 01:12:30,967 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Close JobManager connection for job 
ab806ce6e5b0dda5eae3139eb784ddb8.
2020-03-06 01:12:31,931 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Close JobManager connection for job 
ab806ce6e5b0dda5eae3139eb784ddb8.
2020-03-06 01:12:31,931 INFO 
 org.apache.flink.runtime.taskexecutor.JobLeaderService     
   - Cannot reconnect to job ab806ce6e5b0dda5eae3139eb784ddb8 because 
it is not registered.
2020-03-06 01:13:07,706 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Close ResourceManager connection 
1aae239700043eb7d1bf819836cea65a.
2020-03-06 01:13:07,706 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Connecting to ResourceManager 
akka.tcp://flink@bitpt417r01:36666/user/resourcemanager(00000000000000000000000000000000).
2020-03-06 01:13:07,722 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Resolved ResourceManager address, beginning registration
2020-03-06 01:13:07,722 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Registration at ResourceManager attempt 1 (timeout=100ms)
2020-03-06 01:13:07,752 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Registration at ResourceManager was declined: 
unrecognized TaskExecutor
2020-03-06 01:13:07,753 INFO 
 org.apache.flink.runtime.taskexecutor.TaskExecutor       
     - Pausing and re-attempting registration in 30000 ms
2020-03-06 01:13:07,993 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner 
                 - RECEIVED SIGNAL 
15: SIGTERM. Shutting down as requested.
2020-03-06 01:13:07,994 INFO 
 org.apache.flink.runtime.blob.TransientBlobCache       
       - Shutting down BLOB cache




        ??????????????????,??????
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
        at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2618)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2499)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:2380)
        at 
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:138)
        at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
        at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:597)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:445)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
2020-03-06 01:12:30,766 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask         
  - Error during disposal of stream operator.
java.io.IOException: Unable to close file because the last block 
BP-183502975-172.17.2.37-1530761932470:blk_1386930362_313202578 does not have 
enough number of replicas.
        at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2850)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2795)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2779)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2722)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:100)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:597)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:445)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
        
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:05,767 WARN  org.apache.hadoop.hdfs.DFSClient   
                      
     - NotReplicatedYetException sleeping 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
 retries left 4
2020-03-06 01:12:06,170 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Exception while adding a block
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:06,170 WARN  org.apache.hadoop.hdfs.DFSClient   
                      
     - NotReplicatedYetException sleeping 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
 retries left 3
2020-03-06 01:12:06,973 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Exception while adding a block
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:06,974 WARN  org.apache.hadoop.hdfs.DFSClient   
                      
     - NotReplicatedYetException sleeping 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
 retries left 2
2020-03-06 01:12:08,576 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Exception while adding a block
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:08,576 WARN  org.apache.hadoop.hdfs.DFSClient   
                      
     - NotReplicatedYetException sleeping 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
 retries left 1
2020-03-06 01:12:11,779 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Exception while adding a block
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:11,779 INFO  org.apache.hadoop.hdfs.DFSClient   
                      
     - Waiting for replication for 6 seconds
2020-03-06 01:12:11,780 WARN  org.apache.hadoop.hdfs.DFSClient   
                      
     - NotReplicatedYetException sleeping 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
 retries left 0
2020-03-06 01:12:18,182 WARN  org.apache.hadoop.hdfs.DFSClient   
                      
     - DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:18,207 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask         
  - Error during disposal of stream operator.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:18,229 INFO  org.apache.flink.runtime.taskmanager.Task 
                    - Source: 
Custom Source -> Sink: Unnamed (1/1) (ac0bafbba9259b2e7bf11b8d50d07e75) 
switched from RUNNING to FAILED.
java.lang.Exception: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
java.lang.Exception: Could not materialize checkpoint 12673 for operator 
Source: Custom Source -> Sink: Unnamed (2/6).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1082)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1024)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs:/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12673/b1992c81-7821-4d68-9c0b-1302b24c42d0
 in order to obtain the stream state handle
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init&gt;(OperatorSnapshotFinalizer.java:53)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:993)
        ... 3 more
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs:/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12673/b1992c81-7821-4d68-9c0b-1302b24c42d0
 in order to obtain the stream state handle
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
        at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
        ... 5 more
Caused by: java.io.IOException: Unable to close file because the last block 
BP-183502975-172.17.2.37-1530761932470:blk_1386929984_313202200 does not have 
enough number of replicas.
        at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2850)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2795)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2779)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2722)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at 
org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
        at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:320)
        ... 10 more
2020-03-06 01:11:50,335 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp; - 
Triggering checkpoint 12674 @ 1583428310325 for job 
ab806ce6e5b0dda5eae3139eb784ddb8.
2020-03-06 01:12:04,966 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp; - 
Decline checkpoint 12674 by task 159f49478811931a1d2ac3baa506c7d3 of job 
ab806ce6e5b0dda5eae3139eb784ddb8 at 
container_e24_1571438024126_1313417_01_000014 @ bitpt420r01 (dataPort=44032).
2020-03-06 01:12:04,966 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp; - 
Discarding checkpoint 12674 of job ab806ce6e5b0dda5eae3139eb784ddb8.
java.lang.Exception: Could not materialize checkpoint 12674 for operator 
Source: Custom Source -&gt; Sink: Unnamed (1/1).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1082)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1024)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs:/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12674/2d16e301-9378-46b6-bcf5-00ebfd2bcd40
 in order to obtain the stream state handle
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init&gt;(OperatorSnapshotFinalizer.java:53)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:993)
        ... 3 more
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs:/user/shtermuser/holmes_hdfs/checkpoint/ab806ce6e5b0dda5eae3139eb784ddb8/chk-12674/2d16e301-9378-46b6-bcf5-00ebfd2bcd40
 in order to obtain the stream state handle
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
        at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
        ... 5 more
Caused by: java.io.IOException: Unable to close file because the last block 
BP-183502975-172.17.2.37-1530761932470:blk_1386930341_313202557 does not have 
enough number of replicas.
        at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2850)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2795)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2779)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2722)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at 
org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
        at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:320)
        ... 10 more
2020-03-06 01:12:18,269 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; &nbsp;- Source: Custom Source -&gt; Sink: Unnamed (1/1) 
(ac0bafbba9259b2e7bf11b8d50d07e75) switched from RUNNING to FAILED.
java.lang.Exception: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
2020-03-06 01:12:18,271 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG
 &nbsp;- Fail to pass the restart strategy validation in region failover. 
Fallback to fail global.
2020-03-06 01:12:18,272 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; &nbsp;- Job FLink-User-holmes-test (ab806ce6e5b0dda5eae3139eb784ddb8) 
switched from state RUNNING to FAILING.
java.lang.Exception: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException):
 Not replicated yet: 
/user/shtermuser/holmes/WebServerStatisticsCallLog/2020-03-06/01/_WebServerStatisticsCallLog-0-0.txt.in-progress
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3685)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3474)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:219)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:507)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)


        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:425)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1860)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1656)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)



????

回复