Thanks for reporting this issue Quan. I've pulled in Andrey who developed this feature and might shed some light on the problem.
Cheers, Till On Mon, May 6, 2019 at 11:04 AM Congxian Qiu <qcx978132...@gmail.com> wrote: > Hi Quan > Is the problem still there when running on 1.8? If there is still a > problem when using 1.8, could you please share a minimal reproduce demo. > Thanks > > Best, Congxian > On May 6, 2019, 14:44 +0800, Shi Quan <qua...@outlook.com>, wrote: > > Hi, > > Recently we encounter with IllegalArgumentException when using state > with TTL enable and kryoSerializer as field Serializer. Test code > description: > > > > 1. Use heap state backend > > 2. Create MapSateDescription by Class(String.class and HashMap.class) > > 3. Enable state TTL > > 4. Flink version: 1.6.1 > > > > Core test code: > > > > MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", > String.class, HashMap.class); > > mapStateDescriptor.enableTimeToLive(ttlConfig); > > > > IllegalArgumentException was throwed here cause the value of > originalSerializers.length is 1: > > > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > > PrecomputedParameters precomputed, > > TypeSerializer<?> ... originalSerializers) { > > Preconditions.checkNotNull(originalSerializers); > > Preconditions.checkArgument(originalSerializers.length == 2); > > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) > originalSerializers[1]); > > } > > > > > > We found some clues in flink source code, here what happed step by step: > > > > Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState > when open operator , meanwhile, an CompositeSerializer was created. Cause > KryoSerialzer.duplicate create another KryoSerialzer instance(see source > code), CompositeSerializer.precomputed.stateful was set to “true”. Could > someone tell me what stateful mean in precomputed? > > > > Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. > CompositeSerializer was duplicate in this step. Source code: > > > > @Override > > public CompositeSerializer<T> duplicate() { > > return precomputed.stateful ? > > createSerializerInstance(precomputed, > duplicateFieldSerializers(fieldSerializers)) : this; > > } > > > > > > > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > > PrecomputedParameters precomputed, > > TypeSerializer<?> ... originalSerializers) { > > Preconditions.checkNotNull(originalSerializers); > > Preconditions.checkArgument(originalSerializers.length == 2); > > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) > originalSerializers[1]); > > } > > We notice that the new TtlSerializer only contains > fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed. > > > > Step3. Duplicate Serializer again whe snapshot, Source Code: > > > > CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> > owningStateTable) { > > > > super(owningStateTable); > > this.snapshotData = owningStateTable.snapshotTableArrays(); > > this.snapshotVersion = owningStateTable.getStateTableVersion(); > > this.numberOfEntriesInSnapshotData = owningStateTable.size(); > > > > // We create duplicates of the serializers for the async snapshot, > because TypeSerializer > > // might be stateful and shared with the event processing thread. > > this.localKeySerializer = > owningStateTable.keyContext.getKeySerializer().duplicate(); > > this.localNamespaceSerializer = > owningStateTable.metaInfo.getNamespaceSerializer().duplicate(); > > this.localStateSerializer = > owningStateTable.metaInfo.getStateSerializer().duplicate(); > > > > this.partitionedStateTableSnapshot = null; > > } > > Stack: > > > > * new CopyOnWriteStateTableSnapshot > > * CopyOnWriteStateTable.snapshot() > > * > HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState() > > * …… > > * HeapKeyedStateBackend.snapshot() > > > > > > IllegalArgumentException happens when duplicate the compositeSerializer > created in step2, cause only one field serializer inside it. > > > > Did I clarify the problem? Is this a bug? > > We compare KryoSerializer with PojoSerializer,PojoSerilizer does not > just return a new serializer instance, but return original instance in most > case. Which one has bug, KryoSerilizer or > TtlStateFactory::createSerializerInstance? > > > > Best, > > Quan Shi > > >