Am I understanding things correctly in that the same savepoint cannot be restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried

Reply via email to