Hello,
We have KeyedBroadcastProcessFunction with broadcast state 
MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary 
is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of 
environment, we can't restore job from savepoint because seems state data is 
corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = 
Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is 
different, after 4980 base64 characters (3735 bytes) there are only 
AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried

Reply via email to