Hi Arvid,

After increasing  producer.kafka.request.timeout.ms from 90000 to 120000.
The job has been running fine for almost 5 days, but one of the tasks
failed again recently for the same timeout error. (attached stack trace
below)
Should I keep increasing producer.kafka.request.timeout.ms value?

Thanks again for the help.
Best regards
Rainie

*Stacktrace:*
{job_name}/{job_id}/chk-43556/_metadata, reference=(default),
fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in
thread Thread[Process-Event -> Filter-data08 (237/240),5,Flink Task
Threads] took 0 ms.
Canceler.run(Task.java:1434)
... 1 more2021-03-17 17:46:42,284 INFO
 org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could
not complete snapshot 43556 for operator Sink: Sink-data08 (237/240).
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Expiring 53 record(s) for frontend_event_core-46:
122269 ms has passed since last append
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
  at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
  at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
  at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
  at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
  at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
  at
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
  at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
  at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
  at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
  at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53
record(s) for frontend_event_core-46: 122269 ms has passed since last append
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task
                - Attempting to cancel task Source: {operator name}-Event
(214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task
                - Source: {operator name}-Event (214/240)
(69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING.
2021-03-17 17:46:42,355 INFO  org.apache.flink.runtime.taskmanager.Task
                - Triggering cancellation of task code Source: {operator
name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,358 WARN
 org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
while canceling task.
java.lang.Exception:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
  at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
  at
com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436)
  at
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
  at org.apache.flink.runtime.taskmanager.Task$Task
2021-03-17 17:46:42,391 INFO  org.apache.flink.runtime.taskmanager.Task
                - Attempting to cancel task Process-Event ->
Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d).


On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <raini...@pinterest.com> wrote:

> Thanks for the suggestion, Arvid.
> Currently my job is using producer.kafka.request.timeout.ms=90000
> I will try to increase to 120000.
>
> Best regards
> Rainie
>
> On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Rainie,
>>
>> This looks like the record batching in Kafka producer timed out. At this
>> point, the respective records are lost forever. You probably want to tweak
>> your Kafka settings [1].
>>
>> Usually, Flink should fail and restart at this point and recover without
>> data loss. However, if the transactions are also timing out, that may
>> explain the data loss. So you probably also want to increase the
>> transaction timeout.
>>
>> [1]
>> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception
>>
>> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <raini...@pinterest.com> wrote:
>>
>>> Thanks for the info, David.
>>> The job has checkpointing.
>>> I saw some tasks failed due to
>>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>> send data to Kafka"
>>> Here is stacktrack from JM log:
>>>
>>> container_e17_1611597945897_8007_01_000240 @ worker-node-host
>>> (dataPort=42321).
>>> 2021-02-10 01:19:27,206 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding
>>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure
>>> reason: Checkpoint was declined.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has
>>> passed since last append
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>>> ... 18 more
>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42
>>> record(s) for frontend_event_core-38: 116447 ms has passed since last append
>>> 2021-02-10 01:19:27,216 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>>> tolerable failure threshold.
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
>>> at
>>> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org>
>>> wrote:
>>>
>>>> Rainie,
>>>>
>>>> A restart after a failure can cause data loss if you aren't using
>>>> checkpointing, or if you experience a transaction timeout.
>>>>
>>>> A manual restart can also lead to data loss, depending on how you
>>>> manage the offsets, transactions, and other state during the restart. What
>>>> happened in this case?
>>>>
>>>> David
>>>>
>>>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Thanks Yun and David.
>>>>> There were some tasks that got restarted. We configured the restart
>>>>> policy and the job didn't fail.
>>>>> Will task restart cause data loss?
>>>>>
>>>>> Thanks
>>>>> Rainie
>>>>>
>>>>>
>>>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Rainie,
>>>>>>
>>>>>> Were there any failures/restarts, or is this discrepancy observed
>>>>>> without any disruption to the processing?
>>>>>>
>>>>>> Regards,
>>>>>> David
>>>>>>
>>>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the quick response, Smile.
>>>>>>> I don't use window operators or flatmap.
>>>>>>> Here is the core logic of my filter, it only iterates on filters
>>>>>>> list. Will *rebalance() *cause it?
>>>>>>>
>>>>>>> Thanks again.
>>>>>>> Best regards
>>>>>>> Rainie
>>>>>>>
>>>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> 
>>>>>>> matchedRecordsStream =
>>>>>>>     eventStream
>>>>>>>         .rebalance()
>>>>>>>         .process(new ProcessFunction<T, 
>>>>>>> SplitterIntermediateRecord<T>>() {
>>>>>>>           public void processElement(
>>>>>>>               T element,
>>>>>>>               ProcessFunction<T, SplitterIntermediateRecord<T>>.Context 
>>>>>>> context,
>>>>>>>               Collector<SplitterIntermediateRecord<T>> collector) {
>>>>>>>             for (StreamFilter filter : filters) {
>>>>>>>               if (filter.match(element)) {
>>>>>>>                 SubstreamConfig substreamConfig = 
>>>>>>> filter.getSubstreamConfig();
>>>>>>>                 SplitterIntermediateRecord<T> result = new 
>>>>>>> SplitterIntermediateRecord<>(
>>>>>>>                     substreamConfig.getKafkaCluster(),
>>>>>>>                     substreamConfig.getKafkaTopic(),
>>>>>>>                     substreamConfig.getCutoverKafkaTopic(),
>>>>>>>                     substreamConfig.getCutoverTimestampInMs(),
>>>>>>>                     element);
>>>>>>>                 collector.collect(result);
>>>>>>>               }
>>>>>>>             }
>>>>>>>           }
>>>>>>>         })
>>>>>>>         .name("Process-" + eventClass.getSimpleName());
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote:
>>>>>>>
>>>>>>>> Hi Rainie,
>>>>>>>>
>>>>>>>> Could you please provide more information about your processing
>>>>>>>> logic?
>>>>>>>> Do you use window operators?
>>>>>>>> If there's no time-based operator in your logic, late arrival data
>>>>>>>> won't be
>>>>>>>> dropped by default and there might be something wrong with your
>>>>>>>> flat map or
>>>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to get
>>>>>>>> the late
>>>>>>>> data of the window and have a look at them. See [1] for more
>>>>>>>> information
>>>>>>>> about sideOutputLateData().
>>>>>>>>
>>>>>>>> [1].
>>>>>>>>
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Smile
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Sent from:
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>>
>>>>>>>

Reply via email to