Hi, this problem is fixed in Flink >= 1.4.3, see https://issues.apache.org/jira/browse/FLINK-8836 <https://issues.apache.org/jira/browse/FLINK-8836>.
Best, Stefan > Am 18.08.2018 um 05:07 schrieb Bruce Qiu <qiuyanjun...@gmail.com>: > > Hi Community, > I am using Flink 1.4.2 to do streaming processing. I fetch data from Kafka > and write the parquet file to HDFS. In the previous environment, the Kafka > had 192 partitions and I set the source parallelism to 192, the application > works fine. But recently we had increased the Kafka paritions to 384. So I > changed the source parallelism to 384. After I made this change, the > application throws the exception as blow, and the checkpoint is always fail. > Also I saw the backpressure is very high in the ColFlatMap stage. My > application’s DAG as blow. Can someone helps me about this exception, thanks > a lot. > > DAG Stage > > <image001.png> > > > Exception stack trace: > > java.lang.Exception: Error while triggering checkpoint 109 for Source: Custom > Source (257/384) > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not perform checkpoint 109 for operator > Source: Custom Source (257/384). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111) > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199) > ... 5 more > Caused by: java.lang.Exception: Could not complete snapshot 109 for operator > Source: Custom Source (257/384). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538) > ... 7 more > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) > at > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:448) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:460) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:363) > ... 12 more > > > Regards, > Bruce > <image001.png>