Thanks for the suggestion, Arvid. Currently my job is using producer.kafka.request.timeout.ms=90000 I will try to increase to 120000.
Best regards Rainie On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <ar...@apache.org> wrote: > Hi Rainie, > > This looks like the record batching in Kafka producer timed out. At this > point, the respective records are lost forever. You probably want to tweak > your Kafka settings [1]. > > Usually, Flink should fail and restart at this point and recover without > data loss. However, if the transactions are also timing out, that may > explain the data loss. So you probably also want to increase the > transaction timeout. > > [1] > https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception > > On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <raini...@pinterest.com> wrote: > >> Thanks for the info, David. >> The job has checkpointing. >> I saw some tasks failed due to >> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >> send data to Kafka" >> Here is stacktrack from JM log: >> >> container_e17_1611597945897_8007_01_000240 @ worker-node-host >> (dataPort=42321). >> 2021-02-10 01:19:27,206 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding >> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f. >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not >> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure >> reason: Checkpoint was declined. >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) >> at >> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) >> at >> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) >> at >> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has >> passed since last append >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) >> ... 18 more >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 >> record(s) for frontend_event_core-38: 116447 ms has passed since last append >> 2021-02-10 01:19:27,216 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING. >> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint >> tolerable failure threshold. >> at >> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689) >> at >> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> Best regards >> Rainie >> >> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org> >> wrote: >> >>> Rainie, >>> >>> A restart after a failure can cause data loss if you aren't using >>> checkpointing, or if you experience a transaction timeout. >>> >>> A manual restart can also lead to data loss, depending on how you manage >>> the offsets, transactions, and other state during the restart. What >>> happened in this case? >>> >>> David >>> >>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> wrote: >>> >>>> Thanks Yun and David. >>>> There were some tasks that got restarted. We configured the restart >>>> policy and the job didn't fail. >>>> Will task restart cause data loss? >>>> >>>> Thanks >>>> Rainie >>>> >>>> >>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org> >>>> wrote: >>>> >>>>> Rainie, >>>>> >>>>> Were there any failures/restarts, or is this discrepancy observed >>>>> without any disruption to the processing? >>>>> >>>>> Regards, >>>>> David >>>>> >>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com> >>>>> wrote: >>>>> >>>>>> Thanks for the quick response, Smile. >>>>>> I don't use window operators or flatmap. >>>>>> Here is the core logic of my filter, it only iterates on filters >>>>>> list. Will *rebalance() *cause it? >>>>>> >>>>>> Thanks again. >>>>>> Best regards >>>>>> Rainie >>>>>> >>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> >>>>>> matchedRecordsStream = >>>>>> eventStream >>>>>> .rebalance() >>>>>> .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() >>>>>> { >>>>>> public void processElement( >>>>>> T element, >>>>>> ProcessFunction<T, SplitterIntermediateRecord<T>>.Context >>>>>> context, >>>>>> Collector<SplitterIntermediateRecord<T>> collector) { >>>>>> for (StreamFilter filter : filters) { >>>>>> if (filter.match(element)) { >>>>>> SubstreamConfig substreamConfig = >>>>>> filter.getSubstreamConfig(); >>>>>> SplitterIntermediateRecord<T> result = new >>>>>> SplitterIntermediateRecord<>( >>>>>> substreamConfig.getKafkaCluster(), >>>>>> substreamConfig.getKafkaTopic(), >>>>>> substreamConfig.getCutoverKafkaTopic(), >>>>>> substreamConfig.getCutoverTimestampInMs(), >>>>>> element); >>>>>> collector.collect(result); >>>>>> } >>>>>> } >>>>>> } >>>>>> }) >>>>>> .name("Process-" + eventClass.getSimpleName()); >>>>>> >>>>>> >>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote: >>>>>> >>>>>>> Hi Rainie, >>>>>>> >>>>>>> Could you please provide more information about your processing >>>>>>> logic? >>>>>>> Do you use window operators? >>>>>>> If there's no time-based operator in your logic, late arrival data >>>>>>> won't be >>>>>>> dropped by default and there might be something wrong with your flat >>>>>>> map or >>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to get >>>>>>> the late >>>>>>> data of the window and have a look at them. See [1] for more >>>>>>> information >>>>>>> about sideOutputLateData(). >>>>>>> >>>>>>> [1]. >>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>>> >>>>>>> Regards, >>>>>>> Smile >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Sent from: >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>> >>>>>>