Hi Arvid, After increasing producer.kafka.request.timeout.ms from 90000 to 120000. The job has been running fine for almost 5 days, but one of the tasks failed again recently for the same timeout error. (attached stack trace below) Should I keep increasing producer.kafka.request.timeout.ms value?
Thanks again for the help. Best regards Rainie *Stacktrace:* {job_name}/{job_id}/chk-43556/_metadata, reference=(default), fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in thread Thread[Process-Event -> Filter-data08 (237/240),5,Flink Task Threads] took 0 ms. Canceler.run(Task.java:1434) ... 1 more2021-03-17 17:46:42,284 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator - Could not complete snapshot 43556 for operator Sink: Sink-data08 (237/240). org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append 2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233). 2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task - Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING. 2021-03-17 17:46:42,355 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233). 2021-03-17 17:46:42,358 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - Error while canceling task. java.lang.Exception: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818) at com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task$Task 2021-03-17 17:46:42,391 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Process-Event -> Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d). On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <raini...@pinterest.com> wrote: > Thanks for the suggestion, Arvid. > Currently my job is using producer.kafka.request.timeout.ms=90000 > I will try to increase to 120000. > > Best regards > Rainie > > On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Rainie, >> >> This looks like the record batching in Kafka producer timed out. At this >> point, the respective records are lost forever. You probably want to tweak >> your Kafka settings [1]. >> >> Usually, Flink should fail and restart at this point and recover without >> data loss. However, if the transactions are also timing out, that may >> explain the data loss. So you probably also want to increase the >> transaction timeout. >> >> [1] >> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception >> >> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <raini...@pinterest.com> wrote: >> >>> Thanks for the info, David. >>> The job has checkpointing. >>> I saw some tasks failed due to >>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >>> send data to Kafka" >>> Here is stacktrack from JM log: >>> >>> container_e17_1611597945897_8007_01_000240 @ worker-node-host >>> (dataPort=42321). >>> 2021-02-10 01:19:27,206 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding >>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f. >>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not >>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure >>> reason: Checkpoint was declined. >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) >>> at >>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) >>> at >>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) >>> at >>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has >>> passed since last append >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) >>> ... 18 more >>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 >>> record(s) for frontend_event_core-38: 116447 ms has passed since last append >>> 2021-02-10 01:19:27,216 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING. >>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint >>> tolerable failure threshold. >>> at >>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) >>> at >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410) >>> at >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320) >>> at >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689) >>> at >>> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> Best regards >>> Rainie >>> >>> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org> >>> wrote: >>> >>>> Rainie, >>>> >>>> A restart after a failure can cause data loss if you aren't using >>>> checkpointing, or if you experience a transaction timeout. >>>> >>>> A manual restart can also lead to data loss, depending on how you >>>> manage the offsets, transactions, and other state during the restart. What >>>> happened in this case? >>>> >>>> David >>>> >>>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> >>>> wrote: >>>> >>>>> Thanks Yun and David. >>>>> There were some tasks that got restarted. We configured the restart >>>>> policy and the job didn't fail. >>>>> Will task restart cause data loss? >>>>> >>>>> Thanks >>>>> Rainie >>>>> >>>>> >>>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org> >>>>> wrote: >>>>> >>>>>> Rainie, >>>>>> >>>>>> Were there any failures/restarts, or is this discrepancy observed >>>>>> without any disruption to the processing? >>>>>> >>>>>> Regards, >>>>>> David >>>>>> >>>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the quick response, Smile. >>>>>>> I don't use window operators or flatmap. >>>>>>> Here is the core logic of my filter, it only iterates on filters >>>>>>> list. Will *rebalance() *cause it? >>>>>>> >>>>>>> Thanks again. >>>>>>> Best regards >>>>>>> Rainie >>>>>>> >>>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> >>>>>>> matchedRecordsStream = >>>>>>> eventStream >>>>>>> .rebalance() >>>>>>> .process(new ProcessFunction<T, >>>>>>> SplitterIntermediateRecord<T>>() { >>>>>>> public void processElement( >>>>>>> T element, >>>>>>> ProcessFunction<T, SplitterIntermediateRecord<T>>.Context >>>>>>> context, >>>>>>> Collector<SplitterIntermediateRecord<T>> collector) { >>>>>>> for (StreamFilter filter : filters) { >>>>>>> if (filter.match(element)) { >>>>>>> SubstreamConfig substreamConfig = >>>>>>> filter.getSubstreamConfig(); >>>>>>> SplitterIntermediateRecord<T> result = new >>>>>>> SplitterIntermediateRecord<>( >>>>>>> substreamConfig.getKafkaCluster(), >>>>>>> substreamConfig.getKafkaTopic(), >>>>>>> substreamConfig.getCutoverKafkaTopic(), >>>>>>> substreamConfig.getCutoverTimestampInMs(), >>>>>>> element); >>>>>>> collector.collect(result); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> }) >>>>>>> .name("Process-" + eventClass.getSimpleName()); >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote: >>>>>>> >>>>>>>> Hi Rainie, >>>>>>>> >>>>>>>> Could you please provide more information about your processing >>>>>>>> logic? >>>>>>>> Do you use window operators? >>>>>>>> If there's no time-based operator in your logic, late arrival data >>>>>>>> won't be >>>>>>>> dropped by default and there might be something wrong with your >>>>>>>> flat map or >>>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to get >>>>>>>> the late >>>>>>>> data of the window and have a look at them. See [1] for more >>>>>>>> information >>>>>>>> about sideOutputLateData(). >>>>>>>> >>>>>>>> [1]. >>>>>>>> >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>>>> >>>>>>>> Regards, >>>>>>>> Smile >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Sent from: >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>>> >>>>>>>