You should increase the kafka transaction timeout -- transaction.max.timeout.ms -- to something much larger than the default, which I believe is 15 minutes. Suitable values are more on the order of a few hours to a few days -- long enough to allow for any conceivable outage. This way, if a request does timeout and causes the Flink job to fail, so long as Kafka and Flink recover within the transaction timeout you won't lose any data.
Regards, David On Sat, Mar 20, 2021 at 12:02 AM Rainie Li <raini...@pinterest.com> wrote: > Hi Arvid, > > After increasing producer.kafka.request.timeout.ms from 90000 to 120000. > The job has been running fine for almost 5 days, but one of the tasks > failed again recently for the same timeout error. (attached stack trace > below) > Should I keep increasing producer.kafka.request.timeout.ms value? > > Thanks again for the help. > Best regards > Rainie > > *Stacktrace:* > {job_name}/{job_id}/chk-43556/_metadata, reference=(default), > fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in > thread Thread[Process-Event -> Filter-data08 (237/240),5,Flink Task > Threads] took 0 ms. > Canceler.run(Task.java:1434) > ... 1 more2021-03-17 17:46:42,284 INFO > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Could > not complete snapshot 43556 for operator Sink: Sink-data08 (237/240). > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Expiring 53 record(s) for frontend_event_core-46: > 122269 ms has passed since last append > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53 > record(s) for frontend_event_core-46: 122269 ms has passed since last append > 2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Source: {operator name}-Event > (214/240) (69ec48cfb074de8812eb622ffa097233). > 2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task > - Source: {operator name}-Event (214/240) > (69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING. > 2021-03-17 17:46:42,355 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Source: {operator > name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233). > 2021-03-17 17:46:42,358 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Error > while canceling task. > java.lang.Exception: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818) > at > com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436) > at > org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task$Task > 2021-03-17 17:46:42,391 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Process-Event -> > Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d). > > > On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <raini...@pinterest.com> wrote: > >> Thanks for the suggestion, Arvid. >> Currently my job is using producer.kafka.request.timeout.ms=90000 >> I will try to increase to 120000. >> >> Best regards >> Rainie >> >> On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Rainie, >>> >>> This looks like the record batching in Kafka producer timed out. At this >>> point, the respective records are lost forever. You probably want to tweak >>> your Kafka settings [1]. >>> >>> Usually, Flink should fail and restart at this point and recover without >>> data loss. However, if the transactions are also timing out, that may >>> explain the data loss. So you probably also want to increase the >>> transaction timeout. >>> >>> [1] >>> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception >>> >>> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <raini...@pinterest.com> wrote: >>> >>>> Thanks for the info, David. >>>> The job has checkpointing. >>>> I saw some tasks failed due to >>>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >>>> send data to Kafka" >>>> Here is stacktrack from JM log: >>>> >>>> container_e17_1611597945897_8007_01_000240 @ worker-node-host >>>> (dataPort=42321). >>>> 2021-02-10 01:19:27,206 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding >>>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f. >>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not >>>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure >>>> reason: Checkpoint was declined. >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) >>>> at >>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) >>>> at >>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) >>>> at >>>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >>>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has >>>> passed since last append >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) >>>> at >>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) >>>> at >>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) >>>> at >>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) >>>> ... 18 more >>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 >>>> record(s) for frontend_event_core-38: 116447 ms has passed since last >>>> append >>>> 2021-02-10 01:19:27,216 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING. >>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint >>>> tolerable failure threshold. >>>> at >>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) >>>> at >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410) >>>> at >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320) >>>> at >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689) >>>> at >>>> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> Best regards >>>> Rainie >>>> >>>> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org> >>>> wrote: >>>> >>>>> Rainie, >>>>> >>>>> A restart after a failure can cause data loss if you aren't using >>>>> checkpointing, or if you experience a transaction timeout. >>>>> >>>>> A manual restart can also lead to data loss, depending on how you >>>>> manage the offsets, transactions, and other state during the restart. What >>>>> happened in this case? >>>>> >>>>> David >>>>> >>>>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> >>>>> wrote: >>>>> >>>>>> Thanks Yun and David. >>>>>> There were some tasks that got restarted. We configured the restart >>>>>> policy and the job didn't fail. >>>>>> Will task restart cause data loss? >>>>>> >>>>>> Thanks >>>>>> Rainie >>>>>> >>>>>> >>>>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Rainie, >>>>>>> >>>>>>> Were there any failures/restarts, or is this discrepancy observed >>>>>>> without any disruption to the processing? >>>>>>> >>>>>>> Regards, >>>>>>> David >>>>>>> >>>>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the quick response, Smile. >>>>>>>> I don't use window operators or flatmap. >>>>>>>> Here is the core logic of my filter, it only iterates on filters >>>>>>>> list. Will *rebalance() *cause it? >>>>>>>> >>>>>>>> Thanks again. >>>>>>>> Best regards >>>>>>>> Rainie >>>>>>>> >>>>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> >>>>>>>> matchedRecordsStream = >>>>>>>> eventStream >>>>>>>> .rebalance() >>>>>>>> .process(new ProcessFunction<T, >>>>>>>> SplitterIntermediateRecord<T>>() { >>>>>>>> public void processElement( >>>>>>>> T element, >>>>>>>> ProcessFunction<T, >>>>>>>> SplitterIntermediateRecord<T>>.Context context, >>>>>>>> Collector<SplitterIntermediateRecord<T>> collector) { >>>>>>>> for (StreamFilter filter : filters) { >>>>>>>> if (filter.match(element)) { >>>>>>>> SubstreamConfig substreamConfig = >>>>>>>> filter.getSubstreamConfig(); >>>>>>>> SplitterIntermediateRecord<T> result = new >>>>>>>> SplitterIntermediateRecord<>( >>>>>>>> substreamConfig.getKafkaCluster(), >>>>>>>> substreamConfig.getKafkaTopic(), >>>>>>>> substreamConfig.getCutoverKafkaTopic(), >>>>>>>> substreamConfig.getCutoverTimestampInMs(), >>>>>>>> element); >>>>>>>> collector.collect(result); >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> }) >>>>>>>> .name("Process-" + eventClass.getSimpleName()); >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote: >>>>>>>> >>>>>>>>> Hi Rainie, >>>>>>>>> >>>>>>>>> Could you please provide more information about your processing >>>>>>>>> logic? >>>>>>>>> Do you use window operators? >>>>>>>>> If there's no time-based operator in your logic, late arrival data >>>>>>>>> won't be >>>>>>>>> dropped by default and there might be something wrong with your >>>>>>>>> flat map or >>>>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to >>>>>>>>> get the late >>>>>>>>> data of the window and have a look at them. See [1] for more >>>>>>>>> information >>>>>>>>> about sideOutputLateData(). >>>>>>>>> >>>>>>>>> [1]. >>>>>>>>> >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Smile >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Sent from: >>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>>>> >>>>>>>>