Hello, We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :
public void serialize(T t, DataOutputView dataOutputView) throws IOException { final byte[] data = t.toByteArray(); dataOutputView.writeInt(data.length); dataOutputView.write(data); if (PbCfgTenantDictionary.class.equals(prototype.getClass())) { LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length); LOG.info("serialize PbCfgTenantDictionary.data: {}", Base64.getEncoder().encodeToString(data)); } } public T deserialize(DataInputView dataInputView) throws IOException { final int serializedSize = dataInputView.readInt(); final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType()); final byte[] data = new byte[serializedSize]; dataInputView.read(data); if (PbCfgTenantDictionary.class.equals(prototype.getClass())) { LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length); LOG.info("deserialize PbCfgTenantDictionary.data: {}", Base64.getEncoder().encodeToString(data)); } return parser.parseFrom(data); } Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A= Strangely but the problem effects only 1 environment of 4 I've tried