Hi!

As far as I know,  even if you prepartition the data exactly the same way
in kafka using the key groups, you have  no guarantee that the kafka
consumer source would pick up the right partitions.

Maybe if you have exactly as many kafka partitions as keygroups/max
parallelism, partitioned correctly , but even then you might have to use a
custom source to have the correct partition assignment for the sub tasks.

Long story short, I believe the built in Kafka source doesnt support what
you want. But it should be possible to adapt it to do so.

Cheers
Gyula

On Mon, Dec 2, 2019, 03:49 Congxian Qiu <qcx978132...@gmail.com> wrote:

> Hi
>
> From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the
> same way Flink’s keyBy would partition the data in a shuffle w.r.t.
> key-group assignment.
> you should make sure that the key locates in the right key-group, and the
> key-group locates in the right parallelism. you can ref
> KeyGroupRangeAssignment[2] for more information.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html
> Best,
> Congxian
>
>
> Robin Cassan <robin.cas...@contentsquare.com> 于2019年11月30日周六 上午12:17写道:
>
>> Hi all!
>>
>> We are trying to build a Flink job that consumes a Kafka topic, groups
>> the incoming events in Session Windows according to a String that can
>> be generated by parsing the message (we call it `SessionKey`) and does
>> some processing on the windows before sending them to another Kafka
>> topic.
>> Our first implementation used a `keyBy` operator on the incoming
>> messages before creating the window, but we realized that we could
>> pre-partition our data by `SessionKey` when we insert it into the input
>> Kafka topic with a custom component. This would avoid having to
>> shuffle data around in Flink, since, for a given `SessionKey`, we would
>> ensure that all messages with this key will end-up in the same Kafka
>> partition and thus be read by the same subtask, on a single
>> TaskManager. This means that we should be able to create a keyed-stream
>> from the incoming data without having to transfer data between
>> TaskManagers.
>>
>> To achieve that, we have used the `reinterpretAsKeyedStream` method
>> instead of the previous `keyBy`. This got rid of the shuffling step,
>> but we are wondering if this is the right way of using this feature and
>> whether Flink can manage to match the distribution of Keys from Kafka
>> with the ones assigned to each TaskManager?
>> We have observed that, while attempting to trigger a savepoint, we
>> would encounter exceptions that seem to indicate that the TaskManagers
>> received data whose `SessionKey` didn't match their assigned Keys.
>> Here is one of the stacktrace we saw while savepointing:
>>
>> ```
>> java.lang.IllegalArgumentException: Key group 0 is not in
>> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
>> ```
>> We are currently using Flink 1.8.2 on Kubernetes, savepointing to
>> Amazon S3.
>>
>> Is our observation about Flink not being able to match the Kafka
>> partitioning with the TaskManager's assigned KeyGroups correct?
>> And if so, do you have any pointers on how we could pre-partition our
>> data in Kafka so that Flink can avoid shuffling data before creating
>> the Session Windows?
>>
>> Cheers,
>>
>> Robin
>>
>> --
>>
>>
>> <https://www.contentsquare.com/>
>>
>> Robin CASSAN
>>
>> Data Engineer
>> +33 6 50 77 88 36
>> 5 boulevard de la Madeleine - 75001 Paris
>>
>> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link>
>> <https://www.facebook.com/ContentSquareGlobal>
>> <https://twitter.com/ContentSquare>
>>
>>
>> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link>
>>
>

Reply via email to