Hi all, We are trying to modify our Flink job with iteration ( https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/iterations/). The job works fine with expected outputs and the checkpoints are created successfully at regular intervals. However, when we'd like to create a savepoint for the job, the savepoint got stuck. From Flink UI, some operators could successfully acknowledge (100%) but the other operators have 0% acknowledgements. Below are some of the thread information of those operators with 0% acknowledgements. The main change of this Flink job is the adoption of iterationstream. Without iteration, the savepoints/checkpoints can be created successfully. Does anyone encounter similar issues or know how to fix this? Any comment is appreciated.
Best wishes, Chen-Che Huang { "threadName": "OutputFlusher for xxx-operator", "stringifiedThreadInfo": "\"OutputFlusher for lookup-user-id-v0.6.0\" daemon prio=5 Id=278 TIMED_WAITING\n\tat java.base@11.0.13/j ava.lang.Thread.sleep(Native Method)\n\tat app//org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWr iter.java:240)\n\n" }, { "threadName": "Channel state writer IterationSink-13 (3/12)#0", "stringifiedThreadInfo": "\"Channel state writer IterationSink-13 (3/12)#0\" daemon prio=5 Id=263 WAITING on java.util.concurrent. locks.AbstractQueuedSynchronizer$ConditionObject@6a6e7f8d\n\tat java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method)\n\t- w aiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6a6e7f8d\n\tat java.base@11.0.13/java.util.concurrent. locks.LockSupport.park(Unknown Source)\n\tat java.base@11.0.13 /java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.a wait(Unknown Source)\n\tat java.base@11.0.13/java.util.concurrent.LinkedBlockingDeque.takeFirst(Unknown Source)\n\tat java.base@11.0.1 3/java.util.concurrent.LinkedBlockingDeque.take(Unknown Source)\n\tat app//org.apache.flink.runtime.checkpoint.channel.ChannelStateWri teRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)\n\tat app//org.apache.flink.runtime.checkpoint.channel.Channe lStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)\n\tat app//org.apache.flink.runtime.checkpoint.channe l.ChannelStateWriteRequestExecutorImpl$$Lambda$952/0x0000000800a7a840.run(Unknown Source)\n\t...\n\n" }, { "threadName": "IterationSink-13 (4/12)#0", "stringifiedThreadInfo": "\"IterationSink-13 (4/12)#0\" prio=5 Id=264 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSy nchronizer$ConditionObject@4fff6178\n\tat java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method)\n\t- waiting on java.util.co ncurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4fff6178\n\tat java.base@11.0.13/java.util.concurrent.locks.LockSupport.park Nanos(Unknown Source)\n\tat java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Sour ce)\n\tat app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)\n\tat app//org.apache.f link.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)\n\tat app//o rg.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)\n\tat app//org.apache.flink.st reaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)\n\tat app//org.apache.flink.streaming.runtime .tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\t...\n\n" }