Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() 
to ensure that key is same, but you are right in terms that it is scope 
related, the key is protobuf object and I specify custom TypeInformation in 
keyBy(), today I've changed code to use Tuple2 derived class instead of 
protobuf and it started to work, but why it is not working with protobuf and 
custom type information is unclear, checked serialize/deserialize - returns 
equal object, further until TM restarts it works. Is any special requirements 
for TypeSerializer and TypeInformation for key types ?


@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser<T> parser = 
Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}


________________________________
From: Chesnay Schepler <ches...@apache.org>
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun <yen...@msn.com>; Flink User Mail List 
<user@flink.apache.org>
Subject: Re: state reset(lost) on TM recovery

Just do double-check, are you aware that ValueState within a Keyed*Function is 
scoped to the key of the input element(s)? I.e., any stored value is only 
accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which 
reads from Kafka, transforms data and output into Kafka, one of processing 
nodes is KeyedCoProcessFunction with ValueState:

  1.  generated some input data, I see in log that state.update() is called and 
subsequent state.value() return not null
  2.  wait for checkpoint
  3.  restart taskmanager
  4.  state.value() returns null

I've tried to change backend from rocksdb to filesystem - same result, after 
taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey


Reply via email to