Hi, Shrihari. It seems related to https://issues.apache.org/jira/browse/FLINK-28758 which is unresolved now.
It should only occur in FlinkKafkaConsumer, So you could migrate to KafkaSource to avoid this issue. On Sat, Jun 24, 2023 at 2:46 AM Shrihari R <shrihar...@rapido.bike> wrote: > I am trying to stop the job by triggering the savepoint, but it's > failing with the below error. > *./bin/flink stop --savepointPath > gs://staging-data-flink/flink-1-16-2/savepoints/ > 3a912091b13c446c0d359074414db1db* > > it's working if I just trigger the save point without stopping the job. > *./bin/flink savepoint **3a912091b13c446c0d359074414db1db* > * gs://staging-data-flink/flink-1-16-2/savepoints/* > within the job I am consuming from kafka and writing to kafka and cloud > sql. > using FlinkKafkaConsumer for the kafka sourcing function. > > > > *Error Log* > 2023-06-23 23:29:53 > org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed. > at org.apache.flink.runtime.messages.checkpoint. > SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51 > ) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator > .receiveDeclineMessage(CheckpointCoordinator.java:1036) > at org.apache.flink.runtime.scheduler.ExecutionGraphHandler > .lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) > at org.apache.flink.runtime.scheduler.ExecutionGraphHandler > .lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java: > 119) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor > .java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor > .java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: > org.apache.flink.runtime.checkpoint.CheckpointException: Task name with > subtask : Source: Custom Source -> Flat Map (1/2)#1 Failure reason: Task > has failed. > at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java: > 1375) > at org.apache.flink.runtime.taskmanager.Task > .lambda$triggerCheckpointBarrier$3(Task.java:1318) > at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture > .java:836) > at java.util.concurrent.CompletableFuture$UniHandle.tryFire( > CompletableFuture.java:811) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture > .java:488) > at java.util.concurrent.CompletableFuture.completeExceptionally( > CompletableFuture.java:1990) > at org.apache.flink.streaming.runtime.tasks. > SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:344) > Caused by: java.util.concurrent.CompletionException: java.util.concurrent. > CompletionException: > org.apache.flink.streaming.connectors.kafka.internals. > Handover$ClosedException > at java.util.concurrent.CompletableFuture.encodeThrowable( > CompletableFuture.java:292) > at java.util.concurrent.CompletableFuture.completeThrowable( > CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture > .java:957) > at java.util.concurrent.CompletableFuture$UniCompose.tryFire( > CompletableFuture.java:940) > ... 3 more > Caused by: org.apache.flink.streaming.connectors.kafka.internals. > Handover$ClosedException: > org.apache.flink.streaming.connectors.kafka.internals. > Handover$ClosedException > at org.apache.flink.streaming.connectors.kafka.internals.Handover.close( > Handover.java:177) > at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher > .cancel(KafkaFetcher.java:164) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase > .cancel(FlinkKafkaConsumerBase.java:945) > at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource > .java:128) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask > .stopOperatorForStopWithSavepoint(SourceStreamTask.java:306) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask > .lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:286) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:93) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMail(MailboxProcessor.java:398) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .processMail(MailboxProcessor.java:352) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:229) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:836) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > .java:785) > at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:935) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:750) Appreciate Your help in this > regard. *Regards,* > Shrihari > > THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL > INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE > HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY > REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY > PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE > FROM YOUR SYSTEM. > > NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE CONSTRUED IN ANY WAY TO > GRANT PERMISSION TO TRANSMIT CONFIDENTIAL INFORMATION OR AS A WAIVER OF ANY > CONFIDENTIALITY OR PRIVILEGE. > > RAPIDO DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE > USE OF THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE > INFORMATION PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION > IS AT ALL TIMES SUBJECT TO CHANGE WITHOUT NOTICE > -- Best, Hangxiang.