I will try that. Thanks for your help, David. Best regards Rainie
On Sat, Mar 20, 2021 at 9:46 AM David Anderson <dander...@apache.org> wrote: > You should increase the kafka transaction timeout -- > transaction.max.timeout.ms -- to something much larger than the default, > which I believe is 15 minutes. Suitable values are more on the order of a > few hours to a few days -- long enough to allow for any conceivable outage. > This way, if a request does timeout and causes the Flink job to fail, so > long as Kafka and Flink recover within the transaction timeout you won't > lose any data. > > Regards, > David > > > > On Sat, Mar 20, 2021 at 12:02 AM Rainie Li <raini...@pinterest.com> wrote: > >> Hi Arvid, >> >> After increasing producer.kafka.request.timeout.ms from 90000 to 120000. >> The job has been running fine for almost 5 days, but one of the tasks >> failed again recently for the same timeout error. (attached stack trace >> below) >> Should I keep increasing producer.kafka.request.timeout.ms value? >> >> Thanks again for the help. >> Best regards >> Rainie >> >> *Stacktrace:* >> {job_name}/{job_id}/chk-43556/_metadata, reference=(default), >> fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in >> thread Thread[Process-Event -> Filter-data08 (237/240),5,Flink Task >> Threads] took 0 ms. >> Canceler.run(Task.java:1434) >> ... 1 more2021-03-17 17:46:42,284 INFO >> org.apache.flink.streaming.api.operators.AbstractStreamOperator - Could >> not complete snapshot 43556 for operator Sink: Sink-data08 (237/240). >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed >> to send data to Kafka: Expiring 53 record(s) for frontend_event_core-46: >> 122269 ms has passed since last append >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) >> at >> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) >> at >> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) >> at >> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53 >> record(s) for frontend_event_core-46: 122269 ms has passed since last append >> 2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task >> - Attempting to cancel task Source: {operator name}-Event >> (214/240) (69ec48cfb074de8812eb622ffa097233). >> 2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task >> - Source: {operator name}-Event (214/240) >> (69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING. >> 2021-03-17 17:46:42,355 INFO org.apache.flink.runtime.taskmanager.Task >> - Triggering cancellation of task code Source: {operator >> name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233). >> 2021-03-17 17:46:42,358 WARN >> org.apache.flink.streaming.runtime.tasks.StreamTask - Error >> while canceling task. >> java.lang.Exception: >> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException >> at >> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818) >> at >> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436) >> at >> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528) >> at org.apache.flink.runtime.taskmanager.Task$Task >> 2021-03-17 17:46:42,391 INFO org.apache.flink.runtime.taskmanager.Task >> - Attempting to cancel task Process-Event -> >> Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d). >> >> >> On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <raini...@pinterest.com> wrote: >> >>> Thanks for the suggestion, Arvid. >>> Currently my job is using producer.kafka.request.timeout.ms=90000 >>> I will try to increase to 120000. >>> >>> Best regards >>> Rainie >>> >>> On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Rainie, >>>> >>>> This looks like the record batching in Kafka producer timed out. At >>>> this point, the respective records are lost forever. You probably want to >>>> tweak your Kafka settings [1]. >>>> >>>> Usually, Flink should fail and restart at this point and recover >>>> without data loss. However, if the transactions are also timing out, that >>>> may explain the data loss. So you probably also want to increase the >>>> transaction timeout. >>>> >>>> [1] >>>> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception >>>> >>>> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <raini...@pinterest.com> >>>> wrote: >>>> >>>>> Thanks for the info, David. >>>>> The job has checkpointing. >>>>> I saw some tasks failed due to >>>>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed >>>>> to >>>>> send data to Kafka" >>>>> Here is stacktrack from JM log: >>>>> >>>>> container_e17_1611597945897_8007_01_000240 @ worker-node-host >>>>> (dataPort=42321). >>>>> 2021-02-10 01:19:27,206 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Discarding >>>>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f. >>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not >>>>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure >>>>> reason: Checkpoint was declined. >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >>>>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has >>>>> passed since last append >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) >>>>> at >>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) >>>>> at >>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) >>>>> at >>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) >>>>> ... 18 more >>>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring >>>>> 42 record(s) for frontend_event_core-38: 116447 ms has passed since last >>>>> append >>>>> 2021-02-10 01:19:27,216 INFO >>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>>> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING. >>>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint >>>>> tolerable failure threshold. >>>>> at >>>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) >>>>> at >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410) >>>>> at >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320) >>>>> at >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689) >>>>> at >>>>> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> Best regards >>>>> Rainie >>>>> >>>>> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org> >>>>> wrote: >>>>> >>>>>> Rainie, >>>>>> >>>>>> A restart after a failure can cause data loss if you aren't using >>>>>> checkpointing, or if you experience a transaction timeout. >>>>>> >>>>>> A manual restart can also lead to data loss, depending on how you >>>>>> manage the offsets, transactions, and other state during the restart. >>>>>> What >>>>>> happened in this case? >>>>>> >>>>>> David >>>>>> >>>>>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Yun and David. >>>>>>> There were some tasks that got restarted. We configured the restart >>>>>>> policy and the job didn't fail. >>>>>>> Will task restart cause data loss? >>>>>>> >>>>>>> Thanks >>>>>>> Rainie >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Rainie, >>>>>>>> >>>>>>>> Were there any failures/restarts, or is this discrepancy observed >>>>>>>> without any disruption to the processing? >>>>>>>> >>>>>>>> Regards, >>>>>>>> David >>>>>>>> >>>>>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for the quick response, Smile. >>>>>>>>> I don't use window operators or flatmap. >>>>>>>>> Here is the core logic of my filter, it only iterates on filters >>>>>>>>> list. Will *rebalance() *cause it? >>>>>>>>> >>>>>>>>> Thanks again. >>>>>>>>> Best regards >>>>>>>>> Rainie >>>>>>>>> >>>>>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> >>>>>>>>> matchedRecordsStream = >>>>>>>>> eventStream >>>>>>>>> .rebalance() >>>>>>>>> .process(new ProcessFunction<T, >>>>>>>>> SplitterIntermediateRecord<T>>() { >>>>>>>>> public void processElement( >>>>>>>>> T element, >>>>>>>>> ProcessFunction<T, >>>>>>>>> SplitterIntermediateRecord<T>>.Context context, >>>>>>>>> Collector<SplitterIntermediateRecord<T>> collector) { >>>>>>>>> for (StreamFilter filter : filters) { >>>>>>>>> if (filter.match(element)) { >>>>>>>>> SubstreamConfig substreamConfig = >>>>>>>>> filter.getSubstreamConfig(); >>>>>>>>> SplitterIntermediateRecord<T> result = new >>>>>>>>> SplitterIntermediateRecord<>( >>>>>>>>> substreamConfig.getKafkaCluster(), >>>>>>>>> substreamConfig.getKafkaTopic(), >>>>>>>>> substreamConfig.getCutoverKafkaTopic(), >>>>>>>>> substreamConfig.getCutoverTimestampInMs(), >>>>>>>>> element); >>>>>>>>> collector.collect(result); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> } >>>>>>>>> }) >>>>>>>>> .name("Process-" + eventClass.getSimpleName()); >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Rainie, >>>>>>>>>> >>>>>>>>>> Could you please provide more information about your processing >>>>>>>>>> logic? >>>>>>>>>> Do you use window operators? >>>>>>>>>> If there's no time-based operator in your logic, late arrival >>>>>>>>>> data won't be >>>>>>>>>> dropped by default and there might be something wrong with your >>>>>>>>>> flat map or >>>>>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to >>>>>>>>>> get the late >>>>>>>>>> data of the window and have a look at them. See [1] for more >>>>>>>>>> information >>>>>>>>>> about sideOutputLateData(). >>>>>>>>>> >>>>>>>>>> [1]. >>>>>>>>>> >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Smile >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Sent from: >>>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>>>>> >>>>>>>>>