Hi Quan
Is the problem still there when running on 1.8?  If there is still a problem 
when using 1.8, could you please share a minimal reproduce demo. Thanks

Best, Congxian
On May 6, 2019, 14:44 +0800, Shi Quan <qua...@outlook.com>, wrote:
> Hi,
> Recently we encounter with IllegalArgumentException when using state with TTL 
> enable and kryoSerializer as field Serializer. Test code description:
>
> 1. Use heap state backend
> 2. Create MapSateDescription by Class(String.class and HashMap.class)
> 3. Enable state TTL
> 4. Flink version: 1.6.1
>
> Core test code:
>
> MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", 
> String.class, HashMap.class);
> mapStateDescriptor.enableTimeToLive(ttlConfig);
>
> IllegalArgumentException was throwed here cause the value of 
> originalSerializers.length is 1:
>
> protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> PrecomputedParameters precomputed,
> TypeSerializer<?> ... originalSerializers) {
> Preconditions.checkNotNull(originalSerializers);
> Preconditions.checkArgument(originalSerializers.length == 2);
> return new TtlSerializer<>(precomputed, (TypeSerializer<T>) 
> originalSerializers[1]);
> }
>
>
> We found some clues in flink source code, here what happed step by step:
>
> Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when 
> open operator , meanwhile, an CompositeSerializer was created. Cause 
> KryoSerialzer.duplicate create another KryoSerialzer instance(see source 
> code), CompositeSerializer.precomputed.stateful was set to “true”. Could 
> someone tell me what stateful mean in precomputed?
>
> Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. 
> CompositeSerializer was duplicate in this step. Source code:
>
> @Override
> public CompositeSerializer<T> duplicate() {
> return precomputed.stateful ?
> createSerializerInstance(precomputed, 
> duplicateFieldSerializers(fieldSerializers)) : this;
> }
>
>
>
> protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> PrecomputedParameters precomputed,
> TypeSerializer<?> ... originalSerializers) {
> Preconditions.checkNotNull(originalSerializers);
> Preconditions.checkArgument(originalSerializers.length == 2);
> return new TtlSerializer<>(precomputed, (TypeSerializer<T>) 
> originalSerializers[1]);
> }
> We notice that the new TtlSerializer only contains 
> fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed.
>
> Step3. Duplicate Serializer again whe snapshot, Source Code:
>
> CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> 
> owningStateTable) {
>
> super(owningStateTable);
> this.snapshotData = owningStateTable.snapshotTableArrays();
> this.snapshotVersion = owningStateTable.getStateTableVersion();
> this.numberOfEntriesInSnapshotData = owningStateTable.size();
>
> // We create duplicates of the serializers for the async snapshot, because 
> TypeSerializer
> // might be stateful and shared with the event processing thread.
> this.localKeySerializer = 
> owningStateTable.keyContext.getKeySerializer().duplicate();
> this.localNamespaceSerializer = 
> owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
> this.localStateSerializer = 
> owningStateTable.metaInfo.getStateSerializer().duplicate();
>
> this.partitionedStateTableSnapshot = null;
> }
> Stack:
>
> * new CopyOnWriteStateTableSnapshot
> * CopyOnWriteStateTable.snapshot()
> * 
> HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
> * ……
> * HeapKeyedStateBackend.snapshot()
>
>
> IllegalArgumentException happens when duplicate the compositeSerializer 
> created in step2, cause only one field serializer inside it.
>
> Did I clarify the problem? Is this a bug?
> We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just 
> return a new serializer instance, but return original instance in most case. 
> Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance?
>
> Best,
> Quan Shi
>

Reply via email to