Are the hashes of these object equal as well?
On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:
Hello,
Yes, I'm aware, and I used elements with same key, and logged
getCurrentKey() to ensure that key is same, but you are right in terms
that it is scope related, the key is protobuf object and I specify
custom TypeInformation in keyBy(), today I've changed code to use
Tuple2 derived class instead of protobuf and it started to work, but
why it is not working with protobuf and custom type information is
unclear, checked serialize/deserialize - returns equal object, further
until TM restarts it works. Is any special requirements
for TypeSerializer and TypeInformation for key types ?
@Override public void serialize(T t, DataOutputView dataOutputView)throws
IOException {
final int serializedSize = t.getSerializedSize();
dataOutputView.writeInt(serializedSize); final byte[] data =new
byte[serializedSize]; t.writeTo(CodedOutputStream.newInstance(data));
dataOutputView.write(data); }
@Override public T deserialize(DataInputView dataInputView)throws IOException {
final int serializedSize = dataInputView.readInt(); final
com.google.protobuf.Parser<T> parser =
Unchecked.cast(prototype.getParserForType()); final byte[] data =new
byte[serializedSize]; dataInputView.read(data); return
parser.parseFrom(CodedInputStream.newInstance(data)); }
------------------------------------------------------------------------
*From:* Chesnay Schepler <ches...@apache.org>
*Sent:* Monday, January 11, 2021 4:36 PM
*To:* Alexey Trenikhun <yen...@msn.com>; Flink User Mail List
<user@flink.apache.org>
*Subject:* Re: state reset(lost) on TM recovery
Just do double-check, are you aware that ValueState within a
Keyed*Function is scoped to the key of the input element(s)? I.e., any
stored value is only accessible if an element with the same key is
processed?
On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,
I'm using Flink 1.11.3, state backend is rocksdn. I have streaming
job which reads from Kafka, transforms data and output into Kafka,
one of processing nodes is KeyedCoProcessFunction with ValueState:
1. generated some input data, I see in log that state.update() is
called and subsequent state.value() return not null
2. wait for checkpoint
3. restart taskmanager
4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same
result, after taskmanager restart state.value() returns null
Any ideas, what could cause resetting state to null?
Thanks,
Alexey