Hi, Shrihari.

It seems related to https://issues.apache.org/jira/browse/FLINK-28758 which
is unresolved now.

It should only occur in FlinkKafkaConsumer, So you could migrate to
KafkaSource to avoid this issue.

On Sat, Jun 24, 2023 at 2:46 AM Shrihari R <shrihar...@rapido.bike> wrote:

> I am trying to stop the job by triggering the savepoint, but it's
> failing with the below error.
> *./bin/flink stop --savepointPath
> gs://staging-data-flink/flink-1-16-2/savepoints/
> 3a912091b13c446c0d359074414db1db*
>
> it's working if I just trigger the save point without stopping the job.
> *./bin/flink savepoint **3a912091b13c446c0d359074414db1db*
> * gs://staging-data-flink/flink-1-16-2/savepoints/*
> within the job I am consuming from kafka and writing to kafka and cloud
> sql.
> using FlinkKafkaConsumer  for the kafka sourcing function.
>
>
>
> *Error Log*
> 2023-06-23 23:29:53
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
> at org.apache.flink.runtime.messages.checkpoint.
> SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51
> )
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> .receiveDeclineMessage(CheckpointCoordinator.java:1036)
> at org.apache.flink.runtime.scheduler.ExecutionGraphHandler
> .lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
> at org.apache.flink.runtime.scheduler.ExecutionGraphHandler
> .lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:
> 119)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
> .java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
> org.apache.flink.runtime.checkpoint.CheckpointException: Task name with
> subtask : Source: Custom Source -> Flat Map (1/2)#1 Failure reason: Task
> has failed.
> at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:
> 1375)
> at org.apache.flink.runtime.taskmanager.Task
> .lambda$triggerCheckpointBarrier$3(Task.java:1318)
> at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture
> .java:836)
> at java.util.concurrent.CompletableFuture$UniHandle.tryFire(
> CompletableFuture.java:811)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture
> .java:488)
> at java.util.concurrent.CompletableFuture.completeExceptionally(
> CompletableFuture.java:1990)
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:344)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.
> CompletionException:
> org.apache.flink.streaming.connectors.kafka.internals.
> Handover$ClosedException
> at java.util.concurrent.CompletableFuture.encodeThrowable(
> CompletableFuture.java:292)
> at java.util.concurrent.CompletableFuture.completeThrowable(
> CompletableFuture.java:308)
> at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture
> .java:957)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(
> CompletableFuture.java:940)
> ... 3 more
> Caused by: org.apache.flink.streaming.connectors.kafka.internals.
> Handover$ClosedException:
> org.apache.flink.streaming.connectors.kafka.internals.
> Handover$ClosedException
> at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(
> Handover.java:177)
> at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher
> .cancel(KafkaFetcher.java:164)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .cancel(FlinkKafkaConsumerBase.java:945)
> at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource
> .java:128)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .stopOperatorForStopWithSavepoint(SourceStreamTask.java:306)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:286)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:93)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMail(MailboxProcessor.java:398)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMail(MailboxProcessor.java:352)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:229)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:836)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:785)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:935)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:750) Appreciate Your help in this
> regard. *Regards,*
> Shrihari
>
> THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL
> INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE
> HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY
> REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY
> PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE
> FROM YOUR SYSTEM.
>
> NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE CONSTRUED IN ANY WAY TO
> GRANT PERMISSION TO TRANSMIT CONFIDENTIAL INFORMATION OR AS A WAIVER OF ANY
> CONFIDENTIALITY OR PRIVILEGE.
>
> RAPIDO DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE
> USE OF THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE
> INFORMATION PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION
> IS AT ALL TIMES SUBJECT TO CHANGE WITHOUT NOTICE
>


-- 
Best,
Hangxiang.

Reply via email to