Hi Tommy,

To use stateful timers, you need to have a keyed stream, which gets tricky when 
you’re trying to avoid network traffic caused by the keyBy()

If the number of unique keys isn’t huge, I could think of yet another 
helicopter stunt that you could try :)

It’s possible to calculate a composite key, based on the “real” key and a 
synthetic value, that will wind up on in the same slot where you’re doing this 
calculation.

So that would let you create a keyed stream which would have 
serialization/deserialization cost, but wouldn’t actually go through the 
network stack.

Since the composite key generation is deterministic, you can do the same thing 
on both streams, and join on the composite key.

You’d want to cache the mapping from the real key to the synthetic value, to 
avoid doing this calculation for every record.

If that sounds promising, lmk and I can post some code.

— Ken


> On Mar 4, 2023, at 12:37 PM, Tommy May <tvma...@gmail.com> wrote:
> 
> Hello Ken,
> 
> Thanks for the quick response! That is an interesting workaround. In our case 
> though we are using a CoProcessFunction with stateful timers. Is there a 
> similar workaround path available in that case? The one possible way I could 
> find required partitioning data in kafka in a very specific way to match what 
> Flink's keyBy is doing, and that it'd have additional constraints to the 
> method you described that would be difficult to handle in a prod environment 
> where we don't have full control over the producers & input topics.
> 
> Regarding the addition of a more flexible way to take advantage of 
> pre-partitioned sources like in FLIP-186, would you suggest I forward this 
> chain over to the dev Flink mailing list? 
> 
> Thanks,
> Tommy
> 
>   
> 
> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Tommy,
> 
> I believe there is a way to make this work currently, but with lots of 
> caveats and constraints.
> 
> This assumes you want to avoid any network shuffle.
> 
> 1. Both topics have names that return the same value for 
> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
> 2. Both topics have the same number of partitions.
> 3. The parallelism of your join function exactly matches the number of 
> partitions.
> 4. You can’t change any of the above without losing state.
> 5. You don’t need stateful timers.
> 
> If the above is true, then you could use a CoFlatMapFunction and operator 
> state to implement a stateful join.
> 
> If it’s something like a left outer join without any state TTL or need to 
> keep both sides in state, then it’s pretty easy.
> 
> — Ken
> 
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic name 
> to get the hashCode() result you need.
> 
>> On Mar 3, 2023, at 4:56 PM, Tommy May <tvma...@gmail.com 
>> <mailto:tvma...@gmail.com>> wrote:
>> 
>> Hello, 
>> 
>> My team has a Flink streaming job that does a stateful join across two high 
>> throughput kafka topics. This results in a large amount of data ser/de and 
>> shuffling (about 1gb/s for context). We're running into a bottleneck on this 
>> shuffling step. We've attempted to optimize our flink configuration, join 
>> logic, scale out the kafka topics & flink job, and speed up state access. 
>> What we see is that the join step causes backpressure on the kafka sources 
>> and lag slowly starts to accumulate. 
>> 
>> One idea we had to optimize this is to pre-partition the data in kafka on 
>> the same key that the join is happening on. This'll effectively reduce data 
>> shuffling to 0 and remove the bottleneck that we're seeing. I've done some 
>> research into the topic and from what I understand this is not 
>> straightforward to take advantage of in Flink. It looks to be a fairly 
>> commonly requested feature based on the many StackOverflow posts and slack 
>> questions, and I noticed there is FLIP-186 which attempts to address this 
>> topic as well. 
>> 
>> Are there any upcoming plans to add this feature to a future Flink release? 
>> I believe it'd be super impactful for similar large scale jobs out there. 
>> I'd be interested in helping as well, but admittedly I'm relatively new to 
>> Flink.  I poked around the code a bit, and it certainly did not seem like a 
>> straightforward addition, so it may be best handled by someone with more 
>> internal knowledge.
>> 
>> Thanks,
>> Tommy
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to