Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=90000
I will try to increase to 120000.

Best regards
Rainie

On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Rainie,
>
> This looks like the record batching in Kafka producer timed out. At this
> point, the respective records are lost forever. You probably want to tweak
> your Kafka settings [1].
>
> Usually, Flink should fail and restart at this point and recover without
> data loss. However, if the transactions are also timing out, that may
> explain the data loss. So you probably also want to increase the
> transaction timeout.
>
> [1]
> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception
>
> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <raini...@pinterest.com> wrote:
>
>> Thanks for the info, David.
>> The job has checkpointing.
>> I saw some tasks failed due to
>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>> send data to Kafka"
>> Here is stacktrack from JM log:
>>
>> container_e17_1611597945897_8007_01_000240 @ worker-node-host
>> (dataPort=42321).
>> 2021-02-10 01:19:27,206 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding
>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure
>> reason: Checkpoint was declined.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has
>> passed since last append
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>> ... 18 more
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42
>> record(s) for frontend_event_core-38: 116447 ms has passed since last append
>> 2021-02-10 01:19:27,216 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>> tolerable failure threshold.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
>> at
>> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Best regards
>> Rainie
>>
>> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org>
>> wrote:
>>
>>> Rainie,
>>>
>>> A restart after a failure can cause data loss if you aren't using
>>> checkpointing, or if you experience a transaction timeout.
>>>
>>> A manual restart can also lead to data loss, depending on how you manage
>>> the offsets, transactions, and other state during the restart. What
>>> happened in this case?
>>>
>>> David
>>>
>>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> wrote:
>>>
>>>> Thanks Yun and David.
>>>> There were some tasks that got restarted. We configured the restart
>>>> policy and the job didn't fail.
>>>> Will task restart cause data loss?
>>>>
>>>> Thanks
>>>> Rainie
>>>>
>>>>
>>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org>
>>>> wrote:
>>>>
>>>>> Rainie,
>>>>>
>>>>> Were there any failures/restarts, or is this discrepancy observed
>>>>> without any disruption to the processing?
>>>>>
>>>>> Regards,
>>>>> David
>>>>>
>>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the quick response, Smile.
>>>>>> I don't use window operators or flatmap.
>>>>>> Here is the core logic of my filter, it only iterates on filters
>>>>>> list. Will *rebalance() *cause it?
>>>>>>
>>>>>> Thanks again.
>>>>>> Best regards
>>>>>> Rainie
>>>>>>
>>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> 
>>>>>> matchedRecordsStream =
>>>>>>     eventStream
>>>>>>         .rebalance()
>>>>>>         .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() 
>>>>>> {
>>>>>>           public void processElement(
>>>>>>               T element,
>>>>>>               ProcessFunction<T, SplitterIntermediateRecord<T>>.Context 
>>>>>> context,
>>>>>>               Collector<SplitterIntermediateRecord<T>> collector) {
>>>>>>             for (StreamFilter filter : filters) {
>>>>>>               if (filter.match(element)) {
>>>>>>                 SubstreamConfig substreamConfig = 
>>>>>> filter.getSubstreamConfig();
>>>>>>                 SplitterIntermediateRecord<T> result = new 
>>>>>> SplitterIntermediateRecord<>(
>>>>>>                     substreamConfig.getKafkaCluster(),
>>>>>>                     substreamConfig.getKafkaTopic(),
>>>>>>                     substreamConfig.getCutoverKafkaTopic(),
>>>>>>                     substreamConfig.getCutoverTimestampInMs(),
>>>>>>                     element);
>>>>>>                 collector.collect(result);
>>>>>>               }
>>>>>>             }
>>>>>>           }
>>>>>>         })
>>>>>>         .name("Process-" + eventClass.getSimpleName());
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote:
>>>>>>
>>>>>>> Hi Rainie,
>>>>>>>
>>>>>>> Could you please provide more information about your processing
>>>>>>> logic?
>>>>>>> Do you use window operators?
>>>>>>> If there's no time-based operator in your logic, late arrival data
>>>>>>> won't be
>>>>>>> dropped by default and there might be something wrong with your flat
>>>>>>> map or
>>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to get
>>>>>>> the late
>>>>>>> data of the window and have a look at them. See [1] for more
>>>>>>> information
>>>>>>> about sideOutputLateData().
>>>>>>>
>>>>>>> [1].
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>
>>>>>>> Regards,
>>>>>>> Smile
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>
>>>>>>

Reply via email to