Using an operator state for a stateful join isn't great because it's meant
to hold only a minimal state related to the operator (e.g., partition
tracking).

If your data are already pre-partitioned and the partitioning matches (hash
partitioning on the JAVA representation of the key yielded by the
KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
shuffle.

> What we see is that the join step causes backpressure on the kafka
sources and lag slowly starts to accumulate.

I feel you'd be blocked by the state access downstream (with RocksDB). Are
you sure it isn't the case?

[1]
https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-

Best,
D.

On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Tommy,
>
> To use stateful timers, you need to have a keyed stream, which gets tricky
> when you’re trying to avoid network traffic caused by the keyBy()
>
> If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
>
> It’s possible to calculate a composite key, based on the “real” key and a
> synthetic value, that will wind up on in the same slot where you’re doing
> this calculation.
>
> So that would let you create a keyed stream which would have
> serialization/deserialization cost, but wouldn’t actually go through the
> network stack.
>
> Since the composite key generation is deterministic, you can do the same
> thing on both streams, and join on the composite key.
>
> You’d want to cache the mapping from the real key to the synthetic value,
> to avoid doing this calculation for every record.
>
> If that sounds promising, lmk and I can post some code.
>
> — Ken
>
>
> On Mar 4, 2023, at 12:37 PM, Tommy May <tvma...@gmail.com> wrote:
>
> Hello Ken,
>
> Thanks for the quick response! That is an interesting workaround. In our
> case though we are using a CoProcessFunction with stateful timers. Is there
> a similar workaround path available in that case? The one possible way I
> could find required partitioning data in kafka in a very specific way
> to match what Flink's keyBy is doing, and that it'd have additional
> constraints to the method you described that would be difficult to handle
> in a prod environment where we don't have full control over the producers &
> input topics.
>
> Regarding the addition of a more flexible way to take advantage of
> pre-partitioned sources like in FLIP-186, would you suggest I forward this
> chain over to the dev Flink mailing list?
>
> Thanks,
> Tommy
>
>
>
> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> Hi Tommy,
>>
>> I believe there is a way to make this work currently, but with lots of
>> caveats and constraints.
>>
>> This assumes you want to avoid any network shuffle.
>>
>> 1. Both topics have names that return the same value for
>> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
>> 2. Both topics have the same number of partitions.
>> 3. The parallelism of your join function exactly matches the number of
>> partitions.
>> 4. You can’t change any of the above without losing state.
>> 5. You don’t need stateful timers.
>>
>> If the above is true, then you could use a CoFlatMapFunction and operator
>> state to implement a stateful join.
>>
>> If it’s something like a left outer join without any state TTL or need to
>> keep both sides in state, then it’s pretty easy.
>>
>> — Ken
>>
>> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
>> name to get the hashCode() result you need.
>>
>> On Mar 3, 2023, at 4:56 PM, Tommy May <tvma...@gmail.com> wrote:
>>
>> Hello,
>>
>> My team has a Flink streaming job that does a stateful join across two
>> high throughput kafka topics. This results in a large amount of data ser/de
>> and shuffling (about 1gb/s for context). We're running into a bottleneck on
>> this shuffling step. We've attempted to optimize our flink configuration,
>> join logic, scale out the kafka topics & flink job, and speed up state
>> access. What we see is that the join step causes backpressure on the kafka
>> sources and lag slowly starts to accumulate.
>>
>> One idea we had to optimize this is to pre-partition the data in kafka on
>> the same key that the join is happening on. This'll effectively reduce data
>> shuffling to 0 and remove the bottleneck that we're seeing. I've done some
>> research into the topic and from what I understand this is not
>> straightforward to take advantage of in Flink. It looks to be a fairly
>> commonly requested feature based on the many StackOverflow posts and slack
>> questions, and I noticed there is FLIP-186 which attempts to address this
>> topic as well.
>>
>> Are there any upcoming plans to add this feature to a future Flink
>> release? I believe it'd be super impactful for similar large scale jobs out
>> there. I'd be interested in helping as well, but admittedly I'm relatively
>> new to Flink.  I poked around the code a bit, and it certainly did not seem
>> like a straightforward addition, so it may be best handled by someone with
>> more internal knowledge.
>>
>> Thanks,
>> Tommy
>>
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Reply via email to