Hi Arvid,
After increasing producer.kafka.request.timeout.ms from 90000 to 120000.
The job has been running fine for almost 5 days, but one of the tasks
failed again recently for the same timeout error. (attached stack trace
below)
Should I keep increasing producer.kafka.request.timeout.ms value?
Thanks again for the help.
Best regards
Rainie
*Stacktrace:*
{job_name}/{job_id}/chk-43556/_metadata, reference=(default),
fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in
thread Thread[Process-Event -> Filter-data08 (237/240),5,Flink Task
Threads] took 0 ms.
Canceler.run(Task.java:1434)
... 1 more2021-03-17 17:46:42,284 INFO
org.apache.flink.streaming.api.operators.AbstractStreamOperator - Could
not complete snapshot 43556 for operator Sink: Sink-data08 (237/240).
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Expiring 53 record(s) for frontend_event_core-46:
122269 ms has passed since last append
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53
record(s) for frontend_event_core-46: 122269 ms has passed since last append
2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Source: {operator name}-Event
(214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,354 INFO org.apache.flink.runtime.taskmanager.Task
- Source: {operator name}-Event (214/240)
(69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING.
2021-03-17 17:46:42,355 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Source: {operator
name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,358 WARN
org.apache.flink.streaming.runtime.tasks.StreamTask - Error
while canceling task.
java.lang.Exception:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
at
com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436)
at
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task$Task
2021-03-17 17:46:42,391 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Process-Event ->
Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d).
On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <[email protected]> wrote:
> Thanks for the suggestion, Arvid.
> Currently my job is using producer.kafka.request.timeout.ms=90000
> I will try to increase to 120000.
>
> Best regards
> Rainie
>
> On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <[email protected]> wrote:
>
>> Hi Rainie,
>>
>> This looks like the record batching in Kafka producer timed out. At this
>> point, the respective records are lost forever. You probably want to tweak
>> your Kafka settings [1].
>>
>> Usually, Flink should fail and restart at this point and recover without
>> data loss. However, if the transactions are also timing out, that may
>> explain the data loss. So you probably also want to increase the
>> transaction timeout.
>>
>> [1]
>> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception
>>
>> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <[email protected]> wrote:
>>
>>> Thanks for the info, David.
>>> The job has checkpointing.
>>> I saw some tasks failed due to
>>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>> send data to Kafka"
>>> Here is stacktrack from JM log:
>>>
>>> container_e17_1611597945897_8007_01_000240 @ worker-node-host
>>> (dataPort=42321).
>>> 2021-02-10 01:19:27,206 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure
>>> reason: Checkpoint was declined.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has
>>> passed since last append
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>>> ... 18 more
>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42
>>> record(s) for frontend_event_core-38: 116447 ms has passed since last append
>>> 2021-02-10 01:19:27,216 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
>>> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>>> tolerable failure threshold.
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
>>> at
>>> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <[email protected]>
>>> wrote:
>>>
>>>> Rainie,
>>>>
>>>> A restart after a failure can cause data loss if you aren't using
>>>> checkpointing, or if you experience a transaction timeout.
>>>>
>>>> A manual restart can also lead to data loss, depending on how you
>>>> manage the offsets, transactions, and other state during the restart. What
>>>> happened in this case?
>>>>
>>>> David
>>>>
>>>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks Yun and David.
>>>>> There were some tasks that got restarted. We configured the restart
>>>>> policy and the job didn't fail.
>>>>> Will task restart cause data loss?
>>>>>
>>>>> Thanks
>>>>> Rainie
>>>>>
>>>>>
>>>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Rainie,
>>>>>>
>>>>>> Were there any failures/restarts, or is this discrepancy observed
>>>>>> without any disruption to the processing?
>>>>>>
>>>>>> Regards,
>>>>>> David
>>>>>>
>>>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the quick response, Smile.
>>>>>>> I don't use window operators or flatmap.
>>>>>>> Here is the core logic of my filter, it only iterates on filters
>>>>>>> list. Will *rebalance() *cause it?
>>>>>>>
>>>>>>> Thanks again.
>>>>>>> Best regards
>>>>>>> Rainie
>>>>>>>
>>>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>>
>>>>>>> matchedRecordsStream =
>>>>>>> eventStream
>>>>>>> .rebalance()
>>>>>>> .process(new ProcessFunction<T,
>>>>>>> SplitterIntermediateRecord<T>>() {
>>>>>>> public void processElement(
>>>>>>> T element,
>>>>>>> ProcessFunction<T, SplitterIntermediateRecord<T>>.Context
>>>>>>> context,
>>>>>>> Collector<SplitterIntermediateRecord<T>> collector) {
>>>>>>> for (StreamFilter filter : filters) {
>>>>>>> if (filter.match(element)) {
>>>>>>> SubstreamConfig substreamConfig =
>>>>>>> filter.getSubstreamConfig();
>>>>>>> SplitterIntermediateRecord<T> result = new
>>>>>>> SplitterIntermediateRecord<>(
>>>>>>> substreamConfig.getKafkaCluster(),
>>>>>>> substreamConfig.getKafkaTopic(),
>>>>>>> substreamConfig.getCutoverKafkaTopic(),
>>>>>>> substreamConfig.getCutoverTimestampInMs(),
>>>>>>> element);
>>>>>>> collector.collect(result);
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> })
>>>>>>> .name("Process-" + eventClass.getSimpleName());
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Rainie,
>>>>>>>>
>>>>>>>> Could you please provide more information about your processing
>>>>>>>> logic?
>>>>>>>> Do you use window operators?
>>>>>>>> If there's no time-based operator in your logic, late arrival data
>>>>>>>> won't be
>>>>>>>> dropped by default and there might be something wrong with your
>>>>>>>> flat map or
>>>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to get
>>>>>>>> the late
>>>>>>>> data of the window and have a look at them. See [1] for more
>>>>>>>> information
>>>>>>>> about sideOutputLateData().
>>>>>>>>
>>>>>>>> [1].
>>>>>>>>
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Smile
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Sent from:
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>>
>>>>>>>