Hi Quan Is the problem still there when running on 1.8? If there is still a problem when using 1.8, could you please share a minimal reproduce demo. Thanks
Best, Congxian On May 6, 2019, 14:44 +0800, Shi Quan <qua...@outlook.com>, wrote: > Hi, > Recently we encounter with IllegalArgumentException when using state with TTL > enable and kryoSerializer as field Serializer. Test code description: > > 1. Use heap state backend > 2. Create MapSateDescription by Class(String.class and HashMap.class) > 3. Enable state TTL > 4. Flink version: 1.6.1 > > Core test code: > > MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", > String.class, HashMap.class); > mapStateDescriptor.enableTimeToLive(ttlConfig); > > IllegalArgumentException was throwed here cause the value of > originalSerializers.length is 1: > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > PrecomputedParameters precomputed, > TypeSerializer<?> ... originalSerializers) { > Preconditions.checkNotNull(originalSerializers); > Preconditions.checkArgument(originalSerializers.length == 2); > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) > originalSerializers[1]); > } > > > We found some clues in flink source code, here what happed step by step: > > Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when > open operator , meanwhile, an CompositeSerializer was created. Cause > KryoSerialzer.duplicate create another KryoSerialzer instance(see source > code), CompositeSerializer.precomputed.stateful was set to “true”. Could > someone tell me what stateful mean in precomputed? > > Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. > CompositeSerializer was duplicate in this step. Source code: > > @Override > public CompositeSerializer<T> duplicate() { > return precomputed.stateful ? > createSerializerInstance(precomputed, > duplicateFieldSerializers(fieldSerializers)) : this; > } > > > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > PrecomputedParameters precomputed, > TypeSerializer<?> ... originalSerializers) { > Preconditions.checkNotNull(originalSerializers); > Preconditions.checkArgument(originalSerializers.length == 2); > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) > originalSerializers[1]); > } > We notice that the new TtlSerializer only contains > fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed. > > Step3. Duplicate Serializer again whe snapshot, Source Code: > > CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> > owningStateTable) { > > super(owningStateTable); > this.snapshotData = owningStateTable.snapshotTableArrays(); > this.snapshotVersion = owningStateTable.getStateTableVersion(); > this.numberOfEntriesInSnapshotData = owningStateTable.size(); > > // We create duplicates of the serializers for the async snapshot, because > TypeSerializer > // might be stateful and shared with the event processing thread. > this.localKeySerializer = > owningStateTable.keyContext.getKeySerializer().duplicate(); > this.localNamespaceSerializer = > owningStateTable.metaInfo.getNamespaceSerializer().duplicate(); > this.localStateSerializer = > owningStateTable.metaInfo.getStateSerializer().duplicate(); > > this.partitionedStateTableSnapshot = null; > } > Stack: > > * new CopyOnWriteStateTableSnapshot > * CopyOnWriteStateTable.snapshot() > * > HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState() > * …… > * HeapKeyedStateBackend.snapshot() > > > IllegalArgumentException happens when duplicate the compositeSerializer > created in step2, cause only one field serializer inside it. > > Did I clarify the problem? Is this a bug? > We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just > return a new serializer instance, but return original instance in most case. > Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance? > > Best, > Quan Shi >