If I read CompositeTypeSerializerConfigSnapshot ctor correctly: for (TypeSerializer<?> nestedSerializer : nestedSerializers) { TypeSerializerConfigSnapshot configSnapshot = nestedSerializer.snapshotConfiguration(); this.nestedSerializersAndConfigs.add(
The UnsupportedOperationException thrown by snapshotConfiguration() should be caught without proceeding to nestedSerializersAndConfigs.add(). On Fri, Jun 2, 2017 at 7:02 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Your case doesn't seem like FLINK-5462 since there was no > CancellationException > in the stack trace you posted. > > The exception from TraversableSerializer.snapshotConfiguration() was > added by FLINK-6178 > > FYI > > On Fri, Jun 2, 2017 at 4:04 PM, MAHESH KUMAR <r.mahesh.kumar....@gmail.com > > wrote: > >> Hi Team, >> >> We have some test cases written using StreamingMultipleProgramsTestBase >> It was working fine in version 1.2, we get the following error in version >> 1.3 >> It seems like CheckpointCoordinator fails after this error and >> Checkpointing no longer occurs. >> >> I came across this bug: https://issues.apache.org/jira/browse/FLINK-5462 >> , It looks kind of similar but I am not exactly sure. >> >> 2017-06-02 16:11:07,048 INFO | flink-akka.actor.default-dispatcher-3 | >> org.apache.flink.runtime.executiongraph.ExecutionGraph | Could not >> restart the job pipeline_message_auditor (f54182ae17352efb9aa40667c283ce09) >> because the restart strategy prevented it. >> org.apache.flink.streaming.runtime.tasks.AsynchronousException: >> java.lang.Exception: Could not materialize checkpoint 1 for operator >> TriggerWindow(TumblingProcessingTimeWindows(4000), >> ReducingStateDescriptor{serializer=com.oracle.ci.flink. >> streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$ >> anon$2@e42b922d, reduceFunction=org.apache.flin >> k.streaming.api.scala.function.util.ScalaReduceFunction@59623f44}, >> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301)) >> -> (Map -> Sink: auditor_out-kafkaSink, Map -> Sink: >> auditor_expire-kafkaSink) (1/1). >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:963) ~[flink-streaming-java_2.11-1. >> 3.0.jar:1.3.0] >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> ~[na:1.8.0_112] >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> ~[na:1.8.0_112] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> ~[na:1.8.0_112] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> ~[na:1.8.0_112] >> at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_112] >> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for >> operator TriggerWindow(TumblingProcessingTimeWindows(4000), >> ReducingStateDescriptor{serializer=com.oracle.ci.flink. >> streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$ >> anon$2@e42b922d, reduceFunction=org.apache.flin >> k.streaming.api.scala.function.util.ScalaReduceFunction@59623f44}, >> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301)) >> -> (Map -> Sink: auditor_out-kafkaSink, Map -> Sink: >> auditor_expire-kafkaSink) (1/1). >> ... 6 common frames omitted >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.UnsupportedOperationException >> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> ~[na:1.8.0_112] >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> ~[na:1.8.0_112] >> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) >> ~[flink-core-1.3.0.jar:1.3.0] >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:893) ~[flink-streaming-java_2.11-1. >> 3.0.jar:1.3.0] >> ... 5 common frames omitted >> Suppressed: java.lang.Exception: Could not properly cancel managed keyed >> state future. >> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes >> ult.cancel(OperatorSnapshotResult.java:90) ~[flink-streaming-java_2.11-1. >> 3.0.jar:1.3.0] >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.cleanup(StreamTask.java:1018) >> ~[flink-streaming-java_2.11-1.3.0.jar:1.3.0] >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:957) ~[flink-streaming-java_2.11-1. >> 3.0.jar:1.3.0] >> ... 5 common frames omitted >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.UnsupportedOperationException >> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUt >> il.java:43) >> at org.apache.flink.runtime.state.StateUtil.discardStateFuture( >> StateUtil.java:85) >> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes >> ult.cancel(OperatorSnapshotResult.java:88) >> ... 7 common frames omitted >> Caused by: java.lang.UnsupportedOperationException: null >> at org.apache.flink.api.scala.typeutils.TraversableSerializer.s >> napshotConfiguration(TraversableSerializer.scala:155) >> at org.apache.flink.api.common.typeutils.CompositeTypeSerialize >> rConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) >> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerC >> onfigSnapshot.<init>(TupleSerializerConfigSnapshot.java:45) >> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB >> ase.snapshotConfiguration(TupleSerializerBase.java:132) >> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB >> ase.snapshotConfiguration(TupleSerializerBase.java:39) >> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMe >> taInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71) >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$RocksDBFullSnapshotOperation.writeKVStateMetaData(Rock >> sDBKeyedStateBackend.java:591) >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKe >> yedStateBackend.java:510) >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$3.performOperation(RocksDBKeyedStateBackend.java:407) >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$3.performOperation(RocksDBKeyedStateBackend.java:389) >> at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca >> ll(AbstractAsyncIOCallable.java:72) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUt >> il.java:40) >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:893) >> ... 5 common frames omitted >> Caused by: java.lang.UnsupportedOperationException: null >> at org.apache.flink.api.scala.typeutils.TraversableSerializer.s >> napshotConfiguration(TraversableSerializer.scala:155) >> ~[flink-scala_2.11-1.3.0.jar:1.3.0] >> at org.apache.flink.api.common.typeutils.CompositeTypeSerialize >> rConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) >> ~[flink-core-1.3.0.jar:1.3.0] >> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerC >> onfigSnapshot.<init>(TupleSerializerConfigSnapshot.java:45) >> ~[flink-core-1.3.0.jar:1.3.0] >> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB >> ase.snapshotConfiguration(TupleSerializerBase.java:132) >> ~[flink-core-1.3.0.jar:1.3.0] >> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB >> ase.snapshotConfiguration(TupleSerializerBase.java:39) >> ~[flink-core-1.3.0.jar:1.3.0] >> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMe >> taInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71) >> ~[flink-runtime_2.11-1.3.0.jar:1.3.0] >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591) >> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0] >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:510) >> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0] >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$3.performOperation(RocksDBKeyedStateBackend.java:407) >> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0] >> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >> ckend$3.performOperation(RocksDBKeyedStateBackend.java:389) >> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0] >> at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca >> ll(AbstractAsyncIOCallable.java:72) ~[flink-runtime_2.11-1.3.0.jar:1.3.0] >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> ~[na:1.8.0_112] >> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) >> ~[flink-core-1.3.0.jar:1.3.0] >> ... 6 common frames omitted >> 2017-06-02 16:11:07,048 INFO | flink-akka.actor.default-dispatcher-3 | >> o.apache.flink.runtime.checkpoint.CheckpointCoordinator | Stopping >> checkpoint coordinator for job f54182ae17352efb9aa40667c283ce09 >> 2017-06-02 16:11:07,048 INFO | flink-akka.actor.default-dispatcher-3 | >> o.a.f.r.checkpoint.StandaloneCompletedCheckpointStore | Shutting down >> >> >> Thanks and Regards, >> Mahesh >> >> >> >