Hi all!

We are trying to build a Flink job that consumes a Kafka topic, groups
the incoming events in Session Windows according to a String that can
be generated by parsing the message (we call it `SessionKey`) and does
some processing on the windows before sending them to another Kafka
topic.
Our first implementation used a `keyBy` operator on the incoming
messages before creating the window, but we realized that we could
pre-partition our data by `SessionKey` when we insert it into the input
Kafka topic with a custom component. This would avoid having to
shuffle data around in Flink, since, for a given `SessionKey`, we would
ensure that all messages with this key will end-up in the same Kafka
partition and thus be read by the same subtask, on a single
TaskManager. This means that we should be able to create a keyed-stream
from the incoming data without having to transfer data between
TaskManagers.

To achieve that, we have used the `reinterpretAsKeyedStream` method
instead of the previous `keyBy`. This got rid of the shuffling step,
but we are wondering if this is the right way of using this feature and
whether Flink can manage to match the distribution of Keys from Kafka
with the ones assigned to each TaskManager?
We have observed that, while attempting to trigger a savepoint, we
would encounter exceptions that seem to indicate that the TaskManagers
received data whose `SessionKey` didn't match their assigned Keys.
Here is one of the stacktrace we saw while savepointing:

```
java.lang.IllegalArgumentException: Key group 0 is not in
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
```
We are currently using Flink 1.8.2 on Kubernetes, savepointing to
Amazon S3.

Is our observation about Flink not being able to match the Kafka
partitioning with the TaskManager's assigned KeyGroups correct?
And if so, do you have any pointers on how we could pre-partition our
data in Kafka so that Flink can avoid shuffling data before creating
the Session Windows?

Cheers,

Robin

-- 


<https://www.contentsquare.com/>

Robin CASSAN

Data Engineer
+33 6 50 77 88 36
5 boulevard de la Madeleine - 75001 Paris
<https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link>
<https://www.facebook.com/ContentSquareGlobal>
<https://twitter.com/ContentSquare>

<https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link>

Reply via email to