Thanks for the info, David. The job has checkpointing. I saw some tasks failed due to "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka" Here is stacktrack from JM log:
container_e17_1611597945897_8007_01_000240 @ worker-node-host (dataPort=42321). 2021-02-10 01:19:27,206 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has passed since last append at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) ... 18 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 record(s) for frontend_event_core-38: 116447 ms has passed since last append 2021-02-10 01:19:27,216 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689) at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Best regards Rainie On Mon, Mar 8, 2021 at 11:09 AM David Anderson <dander...@apache.org> wrote: > Rainie, > > A restart after a failure can cause data loss if you aren't using > checkpointing, or if you experience a transaction timeout. > > A manual restart can also lead to data loss, depending on how you manage > the offsets, transactions, and other state during the restart. What > happened in this case? > > David > > On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> wrote: > >> Thanks Yun and David. >> There were some tasks that got restarted. We configured the restart >> policy and the job didn't fail. >> Will task restart cause data loss? >> >> Thanks >> Rainie >> >> >> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org> >> wrote: >> >>> Rainie, >>> >>> Were there any failures/restarts, or is this discrepancy observed >>> without any disruption to the processing? >>> >>> Regards, >>> David >>> >>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com> >>> wrote: >>> >>>> Thanks for the quick response, Smile. >>>> I don't use window operators or flatmap. >>>> Here is the core logic of my filter, it only iterates on filters list. >>>> Will *rebalance() *cause it? >>>> >>>> Thanks again. >>>> Best regards >>>> Rainie >>>> >>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> >>>> matchedRecordsStream = >>>> eventStream >>>> .rebalance() >>>> .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() { >>>> public void processElement( >>>> T element, >>>> ProcessFunction<T, SplitterIntermediateRecord<T>>.Context >>>> context, >>>> Collector<SplitterIntermediateRecord<T>> collector) { >>>> for (StreamFilter filter : filters) { >>>> if (filter.match(element)) { >>>> SubstreamConfig substreamConfig = >>>> filter.getSubstreamConfig(); >>>> SplitterIntermediateRecord<T> result = new >>>> SplitterIntermediateRecord<>( >>>> substreamConfig.getKafkaCluster(), >>>> substreamConfig.getKafkaTopic(), >>>> substreamConfig.getCutoverKafkaTopic(), >>>> substreamConfig.getCutoverTimestampInMs(), >>>> element); >>>> collector.collect(result); >>>> } >>>> } >>>> } >>>> }) >>>> .name("Process-" + eventClass.getSimpleName()); >>>> >>>> >>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote: >>>> >>>>> Hi Rainie, >>>>> >>>>> Could you please provide more information about your processing logic? >>>>> Do you use window operators? >>>>> If there's no time-based operator in your logic, late arrival data >>>>> won't be >>>>> dropped by default and there might be something wrong with your flat >>>>> map or >>>>> filter operator. Otherwise, you can use sideOutputLateData() to get >>>>> the late >>>>> data of the window and have a look at them. See [1] for more >>>>> information >>>>> about sideOutputLateData(). >>>>> >>>>> [1]. >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>> >>>>> Regards, >>>>> Smile >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> >>>>