The issue has been resolved, as I said in the previous email. It is caused by the async function, every record processed by the async function will be a state in the async operator, which is a map type(UserAccessLog).
Arvid Heise <ar...@apache.org> 于2021年8月23日周一 下午11:26写道: > I don't see anything suspicious in your code. The stacktrace is also for a > MapSerializer. Do you have another operator where you put Map into a custom > state? > > On Fri, Aug 20, 2021 at 6:43 PM yidan zhao <hinobl...@gmail.com> wrote: > >> But, I do not know why this leads to the job's failure and recovery >> since I have set the tolerable failed checkpoint to Integer.MAX_VALUE. >> Due to the failure, my task manager failed because of the task cancel >> timeout, and about 80% of task managers went down due to cancel >> timeout. >> >> yidan zhao <hinobl...@gmail.com> 于2021年8月21日周六 上午12:35写道: >> > >> > Ok, thanks. I have some result, and you can give some ensure. Here is >> > the issue code: >> > >> > The async function's implementation. It do async redis query, and fill >> > some data back. >> > In code [ currentBatch.get(i).getD().put("ipLabel", >> > objects.getResponses().get(i)); ] the getD() returns a map attr in >> > OriginalUserAccessLog class. >> > The issue occurred when ckpt is doing, and the redis query result >> > returns concurrently when the async function's input queue is being >> > serialized. >> > >> > >> > @Override >> > public void asyncInvoke0(OriginalUserAccessLog input, >> > ResultFuture<OriginalUserAccessLog> >> resultFuture) { >> > inputBuffer.add(input); >> > if (inputBuffer.size() >= 1000) { >> > List<OriginalUserAccessLog> currentBatch = inputBuffer; >> > inputBuffer = new ArrayList<>(); >> > >> > RBatch rBatch = redissonClient.createBatch(); >> > >> > for (OriginalUserAccessLog i : currentBatch) { >> > rBatch.getBucket("ip:" + i.getIp()).getAsync(); >> > } >> > >> > rBatch.executeAsync().onComplete((objects, throwable) -> { >> > if (throwable == null) { >> > for (int i = 0; i < currentBatch.size(); i++) { >> > currentBatch.get(i).getD().put("ipLabel", >> > objects.getResponses().get(i)); >> > } >> > } >> > resultFuture.complete(currentBatch); >> > }); >> > >> > } else { >> > resultFuture.complete(Collections.emptyList()); >> > } >> > } >> > >> > Chesnay Schepler <ches...@apache.org> 于2021年8月20日周五 上午1:56写道: >> > > >> > > Essentially this exception means that the state was modified while a >> > > snapshot was being taken. >> > > >> > > We usually see this when users hold on to some state value beyond a >> > > single call to a user-defined function, particularly from different >> threads. >> > > >> > > We may be able to pinpoint the issue if you were to provide us with >> the >> > > functions. >> > > >> > > On 19/08/2021 16:59, yidan zhao wrote: >> > > > Flink web ui shows the exception as follows. >> > > > In the task (ual_transform_UserLogBlackUidJudger -> >> > > > ual_transform_IpLabel ), the first one is a broadcast process >> > > > function, and the second one is an async function. I do not know >> > > > whether the issues have some relation to it. >> > > > >> > > > And the issues not occurred before, it occurred after I upgraded to >> > > > flink 1.13.2. >> > > > >> > > > >> > > > >> > > > _____exception info from flink web ui:_____ >> > > > java.io.IOException: Could not perform checkpoint 58 for operator >> > > > ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel >> > > > (29/60)#0. >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) >> > > > >> > > > at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) >> > > > >> > > > at org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> > > > >> > > > at org.apache.flink.streaming.runtime.io >> .StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> > > > >> > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> > > > >> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> > > > >> > > > at java.lang.Thread.run(Thread.java:748) >> > > > >> > > > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: >> > > > Could not complete snapshot 58 for operator >> > > > ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel >> > > > (29/60)#0. Failure reason: Checkpoint was declined. >> > > > >> > > > at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264) >> > > > >> > > > at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) >> > > > >> > > > at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) >> > > > >> > > > ... 20 more >> > > > >> > > > Caused by: java.util.ConcurrentModificationException >> > > > >> > > > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445) >> > > > >> > > > at java.util.HashMap$EntryIterator.next(HashMap.java:1479) >> > > > >> > > > at java.util.HashMap$EntryIterator.next(HashMap.java:1477) >> > > > >> > > > at >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156) >> > > > >> > > > at >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >> > > > >> > > > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >> > > > >> > > > at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:260) >> > > > >> > > > at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:234) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106) >> > > > >> > > > at >> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46) >> > > > >> > > > at >> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75) >> > > > >> > > > at >> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:64) >> > > > >> > > > at >> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:76) >> > > > >> > > > at >> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77) >> > > > >> > > > at >> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36) >> > > > >> > > > at >> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77) >> > > > >> > > > at >> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230) >> > > > >> > > > at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:226) >> > > > >> > > > ... 30 more >> > > >> > > >> >