Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6231 hi @pnowojski I did not call the `CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers)` constructor explicitly, the caller is Flink itself, see [here](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala#L123). And I just fix the NPE in this case : ```scala def this() = this(null) //scala ``` but it does not means : ``` CompositeTypeSerializerConfigSnapshot(null); //java ``` it seems means : ``` CompositeTypeSerializerConfigSnapshot(new TypeSerializer<?>[] {null}) //java ``` so it jumps the preconditions not null check : ``` Preconditions.checkNotNull(nestedSerializers); //java ``` then coursed NPE in the `for` loop [here](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java#L53). I think it is a defensive check, then it's OK in our inner Flink version (in the previous comment, I said we customized table&sql to provide stream and dimension table join).
---