Thanks for the info, David.
The job has checkpointing.
I saw some tasks failed due to
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host
(dataPort=42321).
2021-02-10 01:19:27,206 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding
checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason:
Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38:
116447 ms has passed since last append
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42
record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
(7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org> wrote:

> Rainie,
>
> A restart after a failure can cause data loss if you aren't using
> checkpointing, or if you experience a transaction timeout.
>
> A manual restart can also lead to data loss, depending on how you manage
> the offsets, transactions, and other state during the restart. What
> happened in this case?
>
> David
>
> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> wrote:
>
>> Thanks Yun and David.
>> There were some tasks that got restarted. We configured the restart
>> policy and the job didn't fail.
>> Will task restart cause data loss?
>>
>> Thanks
>> Rainie
>>
>>
>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org>
>> wrote:
>>
>>> Rainie,
>>>
>>> Were there any failures/restarts, or is this discrepancy observed
>>> without any disruption to the processing?
>>>
>>> Regards,
>>> David
>>>
>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com>
>>> wrote:
>>>
>>>> Thanks for the quick response, Smile.
>>>> I don't use window operators or flatmap.
>>>> Here is the core logic of my filter, it only iterates on filters list.
>>>> Will *rebalance() *cause it?
>>>>
>>>> Thanks again.
>>>> Best regards
>>>> Rainie
>>>>
>>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> 
>>>> matchedRecordsStream =
>>>>     eventStream
>>>>         .rebalance()
>>>>         .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
>>>>           public void processElement(
>>>>               T element,
>>>>               ProcessFunction<T, SplitterIntermediateRecord<T>>.Context 
>>>> context,
>>>>               Collector<SplitterIntermediateRecord<T>> collector) {
>>>>             for (StreamFilter filter : filters) {
>>>>               if (filter.match(element)) {
>>>>                 SubstreamConfig substreamConfig = 
>>>> filter.getSubstreamConfig();
>>>>                 SplitterIntermediateRecord<T> result = new 
>>>> SplitterIntermediateRecord<>(
>>>>                     substreamConfig.getKafkaCluster(),
>>>>                     substreamConfig.getKafkaTopic(),
>>>>                     substreamConfig.getCutoverKafkaTopic(),
>>>>                     substreamConfig.getCutoverTimestampInMs(),
>>>>                     element);
>>>>                 collector.collect(result);
>>>>               }
>>>>             }
>>>>           }
>>>>         })
>>>>         .name("Process-" + eventClass.getSimpleName());
>>>>
>>>>
>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote:
>>>>
>>>>> Hi Rainie,
>>>>>
>>>>> Could you please provide more information about your processing logic?
>>>>> Do you use window operators?
>>>>> If there's no time-based operator in your logic, late arrival data
>>>>> won't be
>>>>> dropped by default and there might be something wrong with your flat
>>>>> map or
>>>>> filter operator. Otherwise, you can use sideOutputLateData() to get
>>>>> the late
>>>>> data of the window and have a look at them. See [1] for more
>>>>> information
>>>>> about sideOutputLateData().
>>>>>
>>>>> [1].
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>
>>>>> Regards,
>>>>> Smile
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Reply via email to