> That comes with the additional constraints that Ken mentioned, correct?
It could break immediately in cases if a key comes through on a different
partition, or if the number of partitions happen to change? I'm concerned
about that for our use case as we don't have 100% control of the upstream
data source.

There is no way to avoid shuffling if the partitioning is unstable (except
for map-side join, which is a special case in case one side of the join
fits in memory).


> Hi Tommy,
> While not coming up with a sure solution, I’ve got a number of idea on how
> to continue and shed light into the matter:
>    - With respect to diagnostics, have you enabled flame graph
>    (cluster-config.rest.flamegraph.enabled),
>       - It allows you to see the call tree of each task and where
>       dominantly time is spent
>       - That usually gives me quite some insight
>    - You mention serialization could be a problem:
>       - Which serialization are you using currently?
>       - I could imagine to use one the (almost) zero-copy type like
>       RowData
>          - I considered this once but didn’t try
>       - Nico published a nice comparison of the choices w/r to
>       serializers [1]
>    - Just for completeness: pipeline.object-reuse can cut down quite a
>    bit on GC cost adding the need to execute more discipline with object
>    mutation and caching un-serialized objects in arbitrary data structures
> Hope this helps
> Thias
Tommy May
Tuesday, March 7, 2023 3:25 AM
> Hi Ken & David,
> Thanks for following up. I've responded to your questions below.
>  If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
> Unfortunately the number of keys in our case is huge, they're unique per
> handful of events.
> If your data are already pre-partitioned and the partitioning matches
> (hash partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.
> That comes with the additional constraints that Ken mentioned, correct? It
> could break immediately in cases if a key comes through on a different
> partition, or if the number of partitions happen to change? I'm concerned
> about that for our use case as we don't have 100% control of the upstream
> data source.
> I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?
> Yes, you are right that state access is also a limiting factor and some
> optimizations to limit that have helped quite a bit (both in our
> implementation and in using local SSDs for rocksdb). One other path we
> looked at is using memory-backed volumes for rocksdb, but ran into a
> limitation that we cannot configure Flink's process memory lower than the
> k8s container memory, leading to OOMs. More details at
> https://stackoverflow.com/questions/74118022/flink-pods-ooming-using-memory-backed-volume-with-k8s-operator
> .
> I don't have a dashboard currently to immediately point to data shuffling
> as the primary bottleneck, but I thought it could be a huge optimization if
> we can tell Flink to take advantage of the pre-partitioned datasource,
> given we're shuffling near 1 Gb/sec right now. I can see that the join is
> causing the backpressure on the sources though, and figured that network
> and state acces would be the two primary contributors there. Let me know if
> you have any good debugging tools to narrow in on this more.
> Thanks,
> Tommy
David Morávek
> Using an operator state for a stateful join isn't great because it's meant
> to hold only a minimal state related to the operator (e.g., partition
> tracking).
> If your data are already pre-partitioned and the partitioning matches
> (hash partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.
> > What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
> I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?
> [1]
> https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-
> Best,
> D.
Ken Krugler
> wrote:
> Hi Tommy,
> To use stateful timers, you need to have a keyed stream, which gets tricky
> when you’re trying to avoid network traffic caused by the keyBy()
> If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
> It’s possible to calculate a composite key, based on the “real” key and a
> synthetic value, that will wind up on in the same slot where you’re doing
> this calculation.
> So that would let you create a keyed stream which would have
> serialization/deserialization cost, but wouldn’t actually go through the
> network stack.
> Since the composite key generation is deterministic, you can do the same
> thing on both streams, and join on the composite key.
> You’d want to cache the mapping from the real key to the synthetic value,
> to avoid doing this calculation for every record.
> If that sounds promising, lmk and I can post some code.
> — Ken
Tommy May
> Hello Ken,
> Thanks for the quick response! That is an interesting workaround. In our
> case though we are using a CoProcessFunction with stateful timers. Is there
> a similar workaround path available in that case? The one possible way I
> could find required partitioning data in kafka in a very specific way
> to match what Flink's keyBy is doing, and that it'd have additional
> constraints to the method you described that would be difficult to handle
> in a prod environment where we don't have full control over the producers &
> input topics.
> Regarding the addition of a more flexible way to take advantage of
> pre-partitioned sources like in FLIP-186, would you suggest I forward this
> chain over to the dev Flink mailing list?
> Thanks,
> Tommy
Ken Krugler
> wrote:
> Hi Tommy,
> I believe there is a way to make this work currently, but with lots of
> caveats and constraints.
> This assumes you want to avoid any network shuffle.
> 1. Both topics have names that return the same value for
> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
> 2. Both topics have the same number of partitions.
> 3. The parallelism of your join function exactly matches the number of
> partitions.
> 4. You can’t change any of the above without losing state.
> 5. You don’t need stateful timers.
> If the above is true, then you could use a CoFlatMapFunction and operator
> state to implement a stateful join.
> If it’s something like a left outer join without any state TTL or need to
> keep both sides in state, then it’s pretty easy.
> — Ken
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
> name to get the hashCode() result you need.
Tommy May
> Hello,
> My team has a Flink streaming job that does a stateful join across two
> high throughput kafka topics. This results in a large amount of data ser/de
> and shuffling (about 1gb/s for context). We're running into a bottleneck on
> this shuffling step. We've attempted to optimize our flink configuration,
> join logic, scale out the kafka topics & flink job, and speed up state
> access. What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
> One idea we had to optimize this is to pre-partition the data in kafka on
> the same key that the join is happening on. This'll effectively reduce data
> shuffling to 0 and remove the bottleneck that we're seeing. I've done some
> research into the topic and from what I understand this is not
> straightforward to take advantage of in Flink. It looks to be a fairly
> commonly requested feature based on the many StackOverflow posts and slack
> questions, and I noticed there is FLIP-186 which attempts to address this
> topic as well.
> Are there any upcoming plans to add this feature to a future Flink
> release? I believe it'd be super impactful for similar large scale jobs out
> there. I'd be interested in helping as well, but admittedly I'm relatively
> new to Flink.  I poked around the code a bit, and it certainly did not seem
> like a straightforward addition, so it may be best handled by someone with
> more internal knowledge.
> Thanks,
> Tommy
