????????????????????Kafka
Connector??????Api??????????????????IDEA????????????????????????????????????????????Jira
https://issues.apache.org/jira/browse/FLINK-28758????????????????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2022??8??23??(??????) ????11:09
??????: "user-zh"<[email protected]>;
????: Re: Re: flink1.15.1 stop ????????
1 ????????source?????????????? savepoint ?? trigger ??????
2 ???????? cancel ?? stop ????????????????
3 ??????????????kafka
source???????????????????????????????????????????????????????? kafka ??????
client????
yidan zhao <[email protected]> ??2022??8??23?????? 23:06??????
>
> ??????????????????
> ???? flink cancel -s ??????????flink stop ??????????????????????????????web
> ui??????????savepoint????????0/841????????????????????????????
> ????4????????
> ????1
> 2022-08-23 22:47:37,093 WARN
>
org.apache.flink.runtime.taskmanager.Task
[] -
> Source: JobConfig -> Split(JobName_configType)
> (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
> FAILED with failure cause:
> org.apache.flink.util.FlinkRuntimeException: S
> top-with-savepoint failed.
> at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
> at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
> at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> Executor.java:93)
> at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> 8)
> at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at
java.lang.Thread.run(Thread.java:748)
> ???????????? free task??unregister????????
>
> ????2
> ...
> ???????????????? Attempt to cancel ????????????????????cancel????????
>
> Xuyang <[email protected]> ??2022??8??23?????? 22:25??????
> >
> > Hi, TM????????????????????????????????????????????????cp??????
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best??
> > Xuyang
> >
> >
> >
> >
> >
> > Hi, TM????????????????????????????????????????????????cp??????
> > ?? 2022-08-23 20:41:59??"yidan zhao" <[email protected]> ??????
> > >??????????????
> > >?????????????? flink savepoint xxx
????????????????JM??????????????
> > >2022-08-23 20:33:22,307 INFO
> >
>org.apache.flink.runtime.jobmaster.JobMaster
[] -
> > >Triggering savepoint for job 8d231de75b8227a1b
> > >715b1aa665caa91.
> > >
> > >2022-08-23 20:33:22,318 INFO
> >
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator
[] -
> > >Triggering checkpoint 5 (type=SavepointType{na
> > >me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL})
@
> > >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:33:23,701 INFO
> >
>org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> > >[] - Cannot create recoverable writer
> > > due to Recoverable writers on Hadoop are only supported for HDFS,
> > >will use the ordinary writer.
> > >
> > >2022-08-23 20:33:23,908 INFO
> >
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator
[] -
> > >Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
> > >(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1
ms).
> > >
> > >
> > >?????? stop xxx ????????????????JM??????????????????
> > >
> > >2022-08-23 20:35:01,834 INFO
> >
>org.apache.flink.runtime.jobmaster.JobMaster
[] -
> > >Triggering stop-with-savepoint for job
> > >8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:01,842 INFO
> >
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator
[] -
> > >Triggering checkpoint 6 (type=SavepointType{name='Suspend
Savepoint',
> > >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @
1661258101834
> > >for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:02,083 INFO
> >
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator
[] -
> > >Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of
job
> > >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
> > >xxx.xxx.com (dataPort=13156).
> > >????????????????decline?????????? task failed????
> > >org.apache.flink.util.SerializedThrowable: Task name with subtask
:
> > >Source: XXX_Kafka(startTs:latest) ->... ->... ->...
(10/10)#2 Failure
> > >reason: Task has failed.
> > > at
org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> > >~[?:1.8.0_251]
> > > at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> > >~[?:1.8.0_251]
> > > at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> > >~[?:1.8.0_251]
> > > at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> > >~[?:1.8.0_251]
> > > at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >Caused by: org.apache.flink.util.SerializedThrowable:
> >
>org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
> > > at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> > >~[?:1.8.0_251]
> > > at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> > >~[?:1.8.0_251]
> > > at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
> > >~[?:1.8.0_251]
> > > at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> > >~[?:1.8.0_251]
> > > ... 3 more
> > >Caused by: org.apache.flink.util.SerializedThrowable
> > > at
org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
> > >~[?:?]
> > > at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
> > >~[?:?]
> > > at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
> > >~[?:?]
> > > at
org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > > at
java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> > >
> > >yidan zhao <[email protected]> ??2022??8??23??????
20:31??????
> > >>
> > >> ??????stop????????????????????????
> > >> ?????? cancel??cancel -s ???????????? cancel -s
????????????????????????
> > >>
> > >> stop??????????????????
> > >> Could not stop with a savepoint job
"1b87f308e2582f3cc0e3ccc812471201"
> > >> ...
> > >> Caused by: java.util.concurrent.ExecutionException:
> > >> java.util.concurrent.CompletionException:
> > >> org.apache.flink.runtime.checkpoint.CheckpointEx
> > >> ception: Task has failed.
> > >> ...
> > >> Caused by: org.apache.flink.util.SerializedThrowable:
> > >> org.apache.flink.runtime.checkpoint.CheckpointException:
Task has
> > >> failed.
> > >> ...
> > >> Caused by: org.apache.flink.util.SerializedThrowable: Task
has failed.
> > >> ...
> > >>
> > >> ______??????????