A colleague of mine found some hint under “Avro type” [2] in the State 
evolution schema page:

“Example: RocksDB state backend relies on binary objects identity, rather than 
hashCode method implementation. Any changes to the keys object structure could 
lead to non deterministic behaviour.”

I guess it is a known issue then, but it would at least to include that kind of 
fundamental information on the state backend page as well.

Best regards,
/David Haglund

[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#avro-types


From: David Haglund <david.hagl...@niradynamics.se>
Date: Wednesday, 20 January 2021 at 19:57

I have an update. I have created a small project on github, 
https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which 
reproduces the issue.

There seems to be problem with RocksDB in all versions I have tested (from 
1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. 
In Flink 1.10.x and later all events are counted but with separate keys when 
all/both events should be counted using the same key.

The main branch in my sample project is using Flink 1.11.3, then there are 
branches for Flink 1.9.1, 1.10.3 and 1.12.1.

Best regards,
/David Haglund

From: David Haglund <david.hagl...@niradynamics.se>
Date: Wednesday, 20 January 2021 at 09:38
I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to
Flink 1.11.3.

The problem in a combination of 2 components:

* Keys implemented as case classes in Scala where we override the equals and
  hashCode methods. The case class has additional fields which we are not used 
in
  the keyBy (hashCode/equals) but can have different values for a specific key 
(the
 fields we care about).
* Checkpointing with RocksDB

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations
for each unique key including the parameters which we did not want to include in
the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes
hashCode is ignored in the keyBy in our case when we use RocksDB for 
checkpoints.

We do not see this problem if we disable checkpointing or when using
FsStateBackend.

I have seen this with "Incremental Window Aggregation with AggregateFunction"
[1], but a colleague of mine reported he had seen the same issue with
KeyedProcessFunction too.

We are using Scala version 2.11.12 and Java 8.

This looks like a bug to me. Is it a known issue or a new one?

Best regards,
/David Haglund

[1] Incremental Window Aggregation with AggregateFunction
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

David Haglund
Systems Engineer
Fleet Perception for Maintenance
[cid:image001.png@01D6EFD7.17C93E00]
NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden
Mobile: +46 705 634 848
david.hagl...@niradynamics.se
www.niradynamics.se
Together for smarter safety

Reply via email to