[ https://issues.apache.org/jira/browse/FLINK-28758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
hjw updated FLINK-28758: ------------------------ Description: I post a save with savepoint request to Flink Job throught rest api. A Error happened in Kafka connector close. The job will enter restarting . It is successful to use savepoint command alone. {code:java} 13:33:42.857 [Kafka Fetcher for Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-hjw-3, groupId=hjw] Kafka consumer has been closed 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean (1/1)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-hjw-4 unregistered 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-hjw-4, groupId=hjw] Kafka consumer has been closed 13:33:42.860 [Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup StreamTask (operators closed: false, cancelled: false) 13:33:42.860 [jobmanager-io-thread-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 5 by task eeefcb27475446241861ad8db3f33144 of job d6fed247feab1c0bcc1b0dcc2cfb4736 at 79edfa88-ccc3-4140-b0e1-7ce8a7f8669f @ 127.0.0.1 (dataPort=-1). org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: nlp-kafka-source -> nlp-clean (1/1)#0 Failure reason: Task has failed. at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388) at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343) Caused by: org.apache.flink.util.SerializedThrowable: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: null at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945) at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) 13:34:00.925 [flink-akka.actor.default-dispatcher-21] DEBUG org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request. 13:34:00.926 [flink-akka.actor.default-dispatcher-22] DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 13:34:00.926 [flink-akka.actor.default-dispatcher-21] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from 85d44174e17281984d28699c42e3eed6. {code} was: I post a save with savepoint request to Flink Job throught rest api. A Error happened in Kafka connector close. The job will enter restarting . {code:java} 13:33:42.857 [Kafka Fetcher for Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-hjw-3, groupId=hjw] Kafka consumer has been closed 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean (1/1)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-hjw-4 unregistered 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-hjw-4, groupId=hjw] Kafka consumer has been closed 13:33:42.860 [Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup StreamTask (operators closed: false, cancelled: false) 13:33:42.860 [jobmanager-io-thread-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 5 by task eeefcb27475446241861ad8db3f33144 of job d6fed247feab1c0bcc1b0dcc2cfb4736 at 79edfa88-ccc3-4140-b0e1-7ce8a7f8669f @ 127.0.0.1 (dataPort=-1). org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: nlp-kafka-source -> nlp-clean (1/1)#0 Failure reason: Task has failed. at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388) at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343) Caused by: org.apache.flink.util.SerializedThrowable: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 3 common frames omitted Caused by: org.apache.flink.util.SerializedThrowable: null at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945) at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) 13:34:00.925 [flink-akka.actor.default-dispatcher-21] DEBUG org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request. 13:34:00.926 [flink-akka.actor.default-dispatcher-22] DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 13:34:00.926 [flink-akka.actor.default-dispatcher-21] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from 85d44174e17281984d28699c42e3eed6. {code} > Failed to stop with savepoint > ------------------------------ > > Key: FLINK-28758 > URL: https://issues.apache.org/jira/browse/FLINK-28758 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.15.0 > Environment: Flink version:1.15.0 > deploy mode :K8s applicaiton Mode. local mini cluster also have this > problem. > Kafka Connector : use Kafka SourceFunction . No new Api. > Reporter: hjw > Priority: Major > > I post a save with savepoint request to Flink Job throught rest api. > A Error happened in Kafka connector close. > The job will enter restarting . > It is successful to use savepoint command alone. > {code:java} > 13:33:42.857 [Kafka Fetcher for Source: nlp-kafka-source -> nlp-clean > (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > clientId=consumer-hjw-3, groupId=hjw] Kafka consumer has been closed > 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean > (1/1)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info > kafka.consumer for consumer-hjw-4 unregistered > 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean > (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > clientId=consumer-hjw-4, groupId=hjw] Kafka consumer has been closed > 13:33:42.860 [Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup StreamTask > (operators closed: false, cancelled: false) > 13:33:42.860 [jobmanager-io-thread-4] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 5 by task eeefcb27475446241861ad8db3f33144 of job > d6fed247feab1c0bcc1b0dcc2cfb4736 at 79edfa88-ccc3-4140-b0e1-7ce8a7f8669f @ > 127.0.0.1 (dataPort=-1). > org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: > nlp-kafka-source -> nlp-clean (1/1)#0 Failure reason: Task has failed. > at > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388) > at > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343) > Caused by: org.apache.flink.util.SerializedThrowable: > org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 3 common frames omitted > Caused by: org.apache.flink.util.SerializedThrowable: null > at > org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945) > at > org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > 13:34:00.925 [flink-akka.actor.default-dispatcher-21] DEBUG > org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request. > 13:34:00.926 [flink-akka.actor.default-dispatcher-22] DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger > heartbeat request. > 13:34:00.926 [flink-akka.actor.default-dispatcher-21] DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat > request from 85d44174e17281984d28699c42e3eed6. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)