Any suggestions how to troubleshoot the issue? I still can reproduce the 
problem in environment A

Thanks,
Alexey
________________________________
From: Alexey Trenikhun <yen...@msn.com>
Sent: Tuesday, April 12, 2022 7:10:17 AM
To: Chesnay Schepler <ches...@apache.org>; Flink User Mail List 
<user@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I’ve tried to restore job in environment A (where we observe problem) from 
savepoint taken in environment B - restored fine. So looks something in 
environment A corrupts savepoint.
________________________________
From: Alexey Trenikhun <yen...@msn.com>
Sent: Monday, April 11, 2022 7:10:51 AM
To: Chesnay Schepler <ches...@apache.org>; Flink User Mail List 
<user@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I didn’t try same savepoint cross environments. Operator with broadcast state 
was added recently,  I rolled back all environments, created save points with 
old version, upgraded to version with broadcast state, all 4 were upgraded 
fine, took savepoints in each environment and tried to restore from them, 3 
restored and 4th failed (same environment as original failure). Two 
environments are deployed in Azure AKS and using Azure Blob Storage, two other 
are local and use MinIO. Failure happens in one of local environments.
________________________________
From: Chesnay Schepler <ches...@apache.org>
Sent: Monday, April 11, 2022 2:28:48 AM
To: Alexey Trenikhun <yen...@msn.com>; Flink User Mail List 
<user@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Am I understanding things correctly in that the same savepoint cannot be 
restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast state 
MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary 
is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of 
environment, we can't restore job from savepoint because seems state data is 
corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = 
Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is 
different, after 4980 base64 characters (3735 bytes) there are only 
AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried

Reply via email to