Hello Ken, Thanks for the quick response! That is an interesting workaround. In our case though we are using a CoProcessFunction with stateful timers. Is there a similar workaround path available in that case? The one possible way I could find required partitioning data in kafka in a very specific way to match what Flink's keyBy is doing, and that it'd have additional constraints to the method you described that would be difficult to handle in a prod environment where we don't have full control over the producers & input topics.
Regarding the addition of a more flexible way to take advantage of pre-partitioned sources like in FLIP-186, would you suggest I forward this chain over to the dev Flink mailing list? Thanks, Tommy On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_li...@transpac.com> wrote: > Hi Tommy, > > I believe there is a way to make this work currently, but with lots of > caveats and constraints. > > This assumes you want to avoid any network shuffle. > > 1. Both topics have names that return the same value for > ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism. > 2. Both topics have the same number of partitions. > 3. The parallelism of your join function exactly matches the number of > partitions. > 4. You can’t change any of the above without losing state. > 5. You don’t need stateful timers. > > If the above is true, then you could use a CoFlatMapFunction and operator > state to implement a stateful join. > > If it’s something like a left outer join without any state TTL or need to > keep both sides in state, then it’s pretty easy. > > — Ken > > PS - it’s pretty easy to figure out a “-xxx” value to append to a topic > name to get the hashCode() result you need. > > On Mar 3, 2023, at 4:56 PM, Tommy May <tvma...@gmail.com> wrote: > > Hello, > > My team has a Flink streaming job that does a stateful join across two > high throughput kafka topics. This results in a large amount of data ser/de > and shuffling (about 1gb/s for context). We're running into a bottleneck on > this shuffling step. We've attempted to optimize our flink configuration, > join logic, scale out the kafka topics & flink job, and speed up state > access. What we see is that the join step causes backpressure on the kafka > sources and lag slowly starts to accumulate. > > One idea we had to optimize this is to pre-partition the data in kafka on > the same key that the join is happening on. This'll effectively reduce data > shuffling to 0 and remove the bottleneck that we're seeing. I've done some > research into the topic and from what I understand this is not > straightforward to take advantage of in Flink. It looks to be a fairly > commonly requested feature based on the many StackOverflow posts and slack > questions, and I noticed there is FLIP-186 which attempts to address this > topic as well. > > Are there any upcoming plans to add this feature to a future Flink > release? I believe it'd be super impactful for similar large scale jobs out > there. I'd be interested in helping as well, but admittedly I'm relatively > new to Flink. I poked around the code a bit, and it certainly did not seem > like a straightforward addition, so it may be best handled by someone with > more internal knowledge. > > Thanks, > Tommy > > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > >