I am trying to stop the job by triggering the savepoint, but it's failing with the below error. *./bin/flink stop --savepointPath gs://staging-data-flink/flink-1-16-2/savepoints/ 3a912091b13c446c0d359074414db1db*
it's working if I just trigger the save point without stopping the job. *./bin/flink savepoint **3a912091b13c446c0d359074414db1db* * gs://staging-data-flink/flink-1-16-2/savepoints/* within the job I am consuming from kafka and writing to kafka and cloud sql. using FlinkKafkaConsumer for the kafka sourcing function. *Error Log* 2023-06-23 23:29:53 org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed. at org.apache.flink.runtime.messages.checkpoint. SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator .receiveDeclineMessage(CheckpointCoordinator.java:1036) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler .lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler .lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119 ) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: Custom Source -> Flat Map (1/2)#1 Failure reason: Task has failed. at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java: 1375) at org.apache.flink.runtime.taskmanager.Task .lambda$triggerCheckpointBarrier$3(Task.java:1318) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java: 836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire( CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture .java:488) at java.util.concurrent.CompletableFuture.completeExceptionally( CompletableFuture.java:1990) at org.apache.flink.streaming.runtime.tasks. SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:344) Caused by: java.util.concurrent.CompletionException: java.util.concurrent. CompletionException: org.apache.flink.streaming.connectors.kafka.internals. Handover$ClosedException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture .java:292) at java.util.concurrent.CompletableFuture.completeThrowable( CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java: 957) at java.util.concurrent.CompletableFuture$UniCompose.tryFire( CompletableFuture.java:940) ... 3 more Caused by: org.apache.flink.streaming.connectors.kafka.internals. Handover$ClosedException: org.apache.flink.streaming.connectors.kafka.internals. Handover$ClosedException at org.apache.flink.streaming.connectors.kafka.internals.Handover.close( Handover.java:177) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher .cancel(KafkaFetcher.java:164) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase .cancel(FlinkKafkaConsumerBase.java:945) at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource .java:128) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask .stopOperatorForStopWithSavepoint(SourceStreamTask.java:306) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask .lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:286) at org.apache.flink.streaming.runtime.tasks. StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( StreamTaskActionExecutor.java:93) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:836) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:785) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Appreciate Your help in this regard. *Regards,* Shrihari -- THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE FROM YOUR SYSTEM.**** NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE CONSTRUED IN ANY WAY TO GRANT PERMISSION TO TRANSMIT CONFIDENTIAL INFORMATION OR AS A WAIVER OF ANY CONFIDENTIALITY OR PRIVILEGE.**** RAPIDO DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE USE OF THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE INFORMATION PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION IS AT ALL TIMES SUBJECT TO CHANGE WITHOUT NOTICE