Hi,
Recently we encounter with IllegalArgumentException when using state with TTL 
enable and kryoSerializer as field Serializer. Test code description:

  1.  Use heap state backend
  2.  Create MapSateDescription by Class(String.class and HashMap.class)
  3.  Enable state TTL
  4.  Flink version: 1.6.1

Core test code:

MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", 
String.class, HashMap.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);

IllegalArgumentException was throwed here cause the value of 
originalSerializers.length is 1:

protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
   PrecomputedParameters precomputed,
   TypeSerializer<?> ... originalSerializers) {
   Preconditions.checkNotNull(originalSerializers);
   Preconditions.checkArgument(originalSerializers.length == 2);
   return new TtlSerializer<>(precomputed, (TypeSerializer<T>) 
originalSerializers[1]);
}


We found some clues in flink source code, here what happed step by step:

Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when 
open operator , meanwhile, an CompositeSerializer was created.  Cause 
KryoSerialzer.duplicate create another KryoSerialzer  instance(see source 
code), CompositeSerializer.precomputed.stateful was set to “true”. Could 
someone tell me what stateful mean in precomputed?

Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. 
CompositeSerializer was duplicate in this step. Source code:

@Override
public CompositeSerializer<T> duplicate() {
   return precomputed.stateful ?
      createSerializerInstance(precomputed, 
duplicateFieldSerializers(fieldSerializers)) : this;
}



        protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
   PrecomputedParameters precomputed,
   TypeSerializer<?> ... originalSerializers) {
   Preconditions.checkNotNull(originalSerializers);
   Preconditions.checkArgument(originalSerializers.length == 2);
   return new TtlSerializer<>(precomputed, (TypeSerializer<T>) 
originalSerializers[1]);
}
We notice that the new TtlSerializer only contains 
fieldSerializer(originalSerializer[1]),  LongSerializer for timestamp missed.

Step3. Duplicate Serializer again whe snapshot, Source Code:

CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {

   super(owningStateTable);
   this.snapshotData = owningStateTable.snapshotTableArrays();
   this.snapshotVersion = owningStateTable.getStateTableVersion();
   this.numberOfEntriesInSnapshotData = owningStateTable.size();

   // We create duplicates of the serializers for the async snapshot, because 
TypeSerializer
   // might be stateful and shared with the event processing thread.
   this.localKeySerializer = 
owningStateTable.keyContext.getKeySerializer().duplicate();
   this.localNamespaceSerializer = 
owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
   this.localStateSerializer = 
owningStateTable.metaInfo.getStateSerializer().duplicate();

   this.partitionedStateTableSnapshot = null;
}
Stack:

  *   new CopyOnWriteStateTableSnapshot
  *   CopyOnWriteStateTable.snapshot()
  *   
HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
  *   ……
  *   HeapKeyedStateBackend.snapshot()


IllegalArgumentException happens when duplicate the compositeSerializer created 
in step2, cause only one field serializer inside it.

Did I clarify the problem? Is this a bug?
We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just 
return a new serializer instance, but return original instance in most case.  
Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance?

Best,
Quan Shi

Reply via email to