Hi I have the same issue.
BR Jose On Thu, 9 Jan 2020 at 10:28, ouywl <ou...@139.com> wrote: > Hi all: > When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was > happen as* log-1,code is::* > > input.addSink( > new FlinkKafkaProducer<KafkaEvent>( > parameterTool.getRequired("bootstrap.servers"), > parameterTool.getRequired("output-topic"), > new KafkaEventDeSchema())); > > > *Log-1:* > 2020-01-09 09:13:44,476 INFO org.apache.flink.runtime.checkpoint. > CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job > d8827b3f4165b6ba27c8b59c7aa1a400. > 2020-01-09 09:15:33,069 INFO org.apache.flink.runtime.checkpoint. > CheckpointCoordinator - Decline checkpoint 1 by task > f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400 > at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j > (dataPort=33361). > 2020-01-09 09:15:33,070 INFO org.apache.flink.runtime.checkpoint. > CheckpointCoordinator - Discarding checkpoint 1 of job > d8827b3f4165b6ba27c8b59c7aa1a400. > org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: > Checkpoint was declined. > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .snapshotState(AbstractStreamOperator.java:431) > at org.apache.flink.streaming.runtime.tasks. > StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask > .java:1282) > at org.apache.flink.streaming.runtime.tasks. > StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java: > 1216) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .checkpointState(StreamTask.java:872) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .performCheckpoint(StreamTask.java:777) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpointOnBarrier(StreamTask.java:708) > at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler > .notifyCheckpoint(CheckpointBarrierHandler.java:88) > at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > .processBarrier(CheckpointBarrierAligner.java:113) > at org.apache.flink.streaming.runtime.io.CheckpointedInputGate > .pollNext(CheckpointedInputGate.java:155) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .pollNextNullable(StreamTaskNetworkInput.java:102) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .pollNextNullable(StreamTaskNetworkInput.java:47) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:135) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:279) > at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask > .java:301) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: > Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0: > 120018 ms has passed since batch creation > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer > .checkErroneous(FlinkKafkaProducer.java:1196) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer > .flush(FlinkKafkaProducer.java:968) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer > .preCommit(FlinkKafkaProducer.java:892) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer > .preCommit(FlinkKafkaProducer.java:98) > at org.apache.flink.streaming.api.functions.sink. > TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java: > 311) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer > .snapshotState(FlinkKafkaProducer.java:973) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > .trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > .snapshotFunctionState(StreamingFunctionUtils.java:99) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > .snapshotState(AbstractUdfStreamOperator.java:90) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .snapshotState(AbstractStreamOperator.java:399) > ... 17 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58 > record(s) for k8s-test-data-0:120018 ms has passed since batch creation > 2020-01-09 09:15:33,074 INFO org.apache.flink.runtime.executiongraph. > ExecutionGraph - Job producer data frequece > (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING. > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint > tolerable failure threshold. > at org.apache.flink.runtime.checkpoint.CheckpointFailureManager > .handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator > .failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1443) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator > .discardCheckpoint(CheckpointCoordinator.java:1353) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator > .receiveDeclineMessage(CheckpointCoordinator.java:722) > at org.apache.flink.runtime.scheduler.LegacyScheduler > .lambda$declineCheckpoint$2(LegacyScheduler.java:573) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: > 511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent. > ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201( > ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent. > ScheduledThreadPoolExecutor$ScheduledFutureTask.run( > ScheduledThreadPoolExecutor.java:293) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > Best, > Ouywl > >