I will try that.
Thanks for your help, David.

Best regards
Rainie

On Sat, Mar 20, 2021 at 9:46 AM David Anderson <dander...@apache.org> wrote:

> You should increase the kafka transaction timeout --
> transaction.max.timeout.ms -- to something much larger than the default,
> which I believe is 15 minutes. Suitable values are more on the order of a
> few hours to a few days -- long enough to allow for any conceivable outage.
> This way, if a request does timeout and causes the Flink job to fail, so
> long as Kafka and Flink recover within the transaction timeout you won't
> lose any data.
>
> Regards,
> David
>
>
>
> On Sat, Mar 20, 2021 at 12:02 AM Rainie Li <raini...@pinterest.com> wrote:
>
>> Hi Arvid,
>>
>> After increasing  producer.kafka.request.timeout.ms from 90000 to 120000.
>> The job has been running fine for almost 5 days, but one of the tasks
>> failed again recently for the same timeout error. (attached stack trace
>> below)
>> Should I keep increasing producer.kafka.request.timeout.ms value?
>>
>> Thanks again for the help.
>> Best regards
>> Rainie
>>
>> *Stacktrace:*
>> {job_name}/{job_id}/chk-43556/_metadata, reference=(default),
>> fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in
>> thread Thread[Process-Event -&gt; Filter-data08 (237/240),5,Flink Task
>> Threads] took 0 ms.
>> Canceler.run(Task.java:1434)
>> ... 1 more2021-03-17 17:46:42,284 INFO
>>  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could
>> not complete snapshot 43556 for operator Sink: Sink-data08 (237/240).
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
>> to send data to Kafka: Expiring 53 record(s) for frontend_event_core-46:
>> 122269 ms has passed since last append
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
>>   at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
>>   at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>>   at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>>   at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>   at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>>   at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>>   at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>>   at
>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>   at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53
>> record(s) for frontend_event_core-46: 122269 ms has passed since last append
>> 2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - Attempting to cancel task Source: {operator name}-Event
>> (214/240) (69ec48cfb074de8812eb622ffa097233).
>> 2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - Source: {operator name}-Event (214/240)
>> (69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING.
>> 2021-03-17 17:46:42,355 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - Triggering cancellation of task code Source: {operator
>> name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
>> 2021-03-17 17:46:42,358 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>> while canceling task.
>> java.lang.Exception:
>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>   at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>>   at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>   at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>   at
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>>   at
>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436)
>>   at
>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>>   at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>>   at org.apache.flink.runtime.taskmanager.Task$Task
>> 2021-03-17 17:46:42,391 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - Attempting to cancel task Process-Event -&gt;
>> Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d).
>>
>>
>> On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <raini...@pinterest.com> wrote:
>>
>>> Thanks for the suggestion, Arvid.
>>> Currently my job is using producer.kafka.request.timeout.ms=90000
>>> I will try to increase to 120000.
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Rainie,
>>>>
>>>> This looks like the record batching in Kafka producer timed out. At
>>>> this point, the respective records are lost forever. You probably want to
>>>> tweak your Kafka settings [1].
>>>>
>>>> Usually, Flink should fail and restart at this point and recover
>>>> without data loss. However, if the transactions are also timing out, that
>>>> may explain the data loss. So you probably also want to increase the
>>>> transaction timeout.
>>>>
>>>> [1]
>>>> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception
>>>>
>>>> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <raini...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Thanks for the info, David.
>>>>> The job has checkpointing.
>>>>> I saw some tasks failed due to
>>>>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed 
>>>>> to
>>>>> send data to Kafka"
>>>>> Here is stacktrack from JM log:
>>>>>
>>>>> container_e17_1611597945897_8007_01_000240 @ worker-node-host
>>>>> (dataPort=42321).
>>>>> 2021-02-10 01:19:27,206 INFO
>>>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - 
>>>>> Discarding
>>>>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
>>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>>>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure
>>>>> reason: Checkpoint was declined.
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by:
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>>>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has
>>>>> passed since last append
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
>>>>> at
>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
>>>>> at
>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>>>>> at
>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>>>>> ... 18 more
>>>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring
>>>>> 42 record(s) for frontend_event_core-38: 116447 ms has passed since last
>>>>> append
>>>>> 2021-02-10 01:19:27,216 INFO
>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>>> (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
>>>>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>>>>> tolerable failure threshold.
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
>>>>> at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
>>>>> at
>>>>> org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>> On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Rainie,
>>>>>>
>>>>>> A restart after a failure can cause data loss if you aren't using
>>>>>> checkpointing, or if you experience a transaction timeout.
>>>>>>
>>>>>> A manual restart can also lead to data loss, depending on how you
>>>>>> manage the offsets, transactions, and other state during the restart. 
>>>>>> What
>>>>>> happened in this case?
>>>>>>
>>>>>> David
>>>>>>
>>>>>> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Yun and David.
>>>>>>> There were some tasks that got restarted. We configured the restart
>>>>>>> policy and the job didn't fail.
>>>>>>> Will task restart cause data loss?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Rainie
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Rainie,
>>>>>>>>
>>>>>>>> Were there any failures/restarts, or is this discrepancy observed
>>>>>>>> without any disruption to the processing?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> David
>>>>>>>>
>>>>>>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the quick response, Smile.
>>>>>>>>> I don't use window operators or flatmap.
>>>>>>>>> Here is the core logic of my filter, it only iterates on filters
>>>>>>>>> list. Will *rebalance() *cause it?
>>>>>>>>>
>>>>>>>>> Thanks again.
>>>>>>>>> Best regards
>>>>>>>>> Rainie
>>>>>>>>>
>>>>>>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> 
>>>>>>>>> matchedRecordsStream =
>>>>>>>>>     eventStream
>>>>>>>>>         .rebalance()
>>>>>>>>>         .process(new ProcessFunction<T, 
>>>>>>>>> SplitterIntermediateRecord<T>>() {
>>>>>>>>>           public void processElement(
>>>>>>>>>               T element,
>>>>>>>>>               ProcessFunction<T, 
>>>>>>>>> SplitterIntermediateRecord<T>>.Context context,
>>>>>>>>>               Collector<SplitterIntermediateRecord<T>> collector) {
>>>>>>>>>             for (StreamFilter filter : filters) {
>>>>>>>>>               if (filter.match(element)) {
>>>>>>>>>                 SubstreamConfig substreamConfig = 
>>>>>>>>> filter.getSubstreamConfig();
>>>>>>>>>                 SplitterIntermediateRecord<T> result = new 
>>>>>>>>> SplitterIntermediateRecord<>(
>>>>>>>>>                     substreamConfig.getKafkaCluster(),
>>>>>>>>>                     substreamConfig.getKafkaTopic(),
>>>>>>>>>                     substreamConfig.getCutoverKafkaTopic(),
>>>>>>>>>                     substreamConfig.getCutoverTimestampInMs(),
>>>>>>>>>                     element);
>>>>>>>>>                 collector.collect(result);
>>>>>>>>>               }
>>>>>>>>>             }
>>>>>>>>>           }
>>>>>>>>>         })
>>>>>>>>>         .name("Process-" + eventClass.getSimpleName());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Rainie,
>>>>>>>>>>
>>>>>>>>>> Could you please provide more information about your processing
>>>>>>>>>> logic?
>>>>>>>>>> Do you use window operators?
>>>>>>>>>> If there's no time-based operator in your logic, late arrival
>>>>>>>>>> data won't be
>>>>>>>>>> dropped by default and there might be something wrong with your
>>>>>>>>>> flat map or
>>>>>>>>>> filter operator. Otherwise, you can use sideOutputLateData() to
>>>>>>>>>> get the late
>>>>>>>>>> data of the window and have a look at them. See [1] for more
>>>>>>>>>> information
>>>>>>>>>> about sideOutputLateData().
>>>>>>>>>>
>>>>>>>>>> [1].
>>>>>>>>>>
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Smile
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Sent from:
>>>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>>>>
>>>>>>>>>

Reply via email to