Am I understanding things correctly in that the same savepoint cannot be
restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?
On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast
state MapStateDescriptor<String, PbCfgTenantDictionary>, where
PbCfgTenantDictionary is Protobuf type, for which we
custom TypeInformation/TypeSerializer. In one of environment, we can't
restore job from savepoint because seems state data is corrupted. I've
added to logging to TypeSerializer :
public void serialize(T t, DataOutputView dataOutputView) throws
IOException {
final byte[] data = t.toByteArray();
dataOutputView.writeInt(data.length);
dataOutputView.write(data);
if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
LOG.info("serialize PbCfgTenantDictionary.data: {}",
Base64.getEncoder().encodeToString(data));
}
}
public T deserialize(DataInputView dataInputView) throws IOException {
final int serializedSize = dataInputView.readInt();
final com.google.protobuf.Parser<T> parser =
Unchecked.cast(prototype.getParserForType());
final byte[] data = new byte[serializedSize];
dataInputView.read(data);
if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
LOG.info("deserialize PbCfgTenantDictionary.data: {}",
Base64.getEncoder().encodeToString(data));
}
return parser.parseFrom(data);
}
Both serialize and deserialize methods print same size 104048, but
data is different, after 4980 base64 characters (3735 bytes) there are
only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried