Using an operator state for a stateful join isn't great because it's meant to hold only a minimal state related to the operator (e.g., partition tracking).
If your data are already pre-partitioned and the partitioning matches (hash partitioning on the JAVA representation of the key yielded by the KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the shuffle. > What we see is that the join step causes backpressure on the kafka sources and lag slowly starts to accumulate. I feel you'd be blocked by the state access downstream (with RocksDB). Are you sure it isn't the case? [1] https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector- Best, D. On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kkrugler_li...@transpac.com> wrote: > Hi Tommy, > > To use stateful timers, you need to have a keyed stream, which gets tricky > when you’re trying to avoid network traffic caused by the keyBy() > > If the number of unique keys isn’t huge, I could think of yet another > helicopter stunt that you could try :) > > It’s possible to calculate a composite key, based on the “real” key and a > synthetic value, that will wind up on in the same slot where you’re doing > this calculation. > > So that would let you create a keyed stream which would have > serialization/deserialization cost, but wouldn’t actually go through the > network stack. > > Since the composite key generation is deterministic, you can do the same > thing on both streams, and join on the composite key. > > You’d want to cache the mapping from the real key to the synthetic value, > to avoid doing this calculation for every record. > > If that sounds promising, lmk and I can post some code. > > — Ken > > > On Mar 4, 2023, at 12:37 PM, Tommy May <tvma...@gmail.com> wrote: > > Hello Ken, > > Thanks for the quick response! That is an interesting workaround. In our > case though we are using a CoProcessFunction with stateful timers. Is there > a similar workaround path available in that case? The one possible way I > could find required partitioning data in kafka in a very specific way > to match what Flink's keyBy is doing, and that it'd have additional > constraints to the method you described that would be difficult to handle > in a prod environment where we don't have full control over the producers & > input topics. > > Regarding the addition of a more flexible way to take advantage of > pre-partitioned sources like in FLIP-186, would you suggest I forward this > chain over to the dev Flink mailing list? > > Thanks, > Tommy > > > > On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> Hi Tommy, >> >> I believe there is a way to make this work currently, but with lots of >> caveats and constraints. >> >> This assumes you want to avoid any network shuffle. >> >> 1. Both topics have names that return the same value for >> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism. >> 2. Both topics have the same number of partitions. >> 3. The parallelism of your join function exactly matches the number of >> partitions. >> 4. You can’t change any of the above without losing state. >> 5. You don’t need stateful timers. >> >> If the above is true, then you could use a CoFlatMapFunction and operator >> state to implement a stateful join. >> >> If it’s something like a left outer join without any state TTL or need to >> keep both sides in state, then it’s pretty easy. >> >> — Ken >> >> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic >> name to get the hashCode() result you need. >> >> On Mar 3, 2023, at 4:56 PM, Tommy May <tvma...@gmail.com> wrote: >> >> Hello, >> >> My team has a Flink streaming job that does a stateful join across two >> high throughput kafka topics. This results in a large amount of data ser/de >> and shuffling (about 1gb/s for context). We're running into a bottleneck on >> this shuffling step. We've attempted to optimize our flink configuration, >> join logic, scale out the kafka topics & flink job, and speed up state >> access. What we see is that the join step causes backpressure on the kafka >> sources and lag slowly starts to accumulate. >> >> One idea we had to optimize this is to pre-partition the data in kafka on >> the same key that the join is happening on. This'll effectively reduce data >> shuffling to 0 and remove the bottleneck that we're seeing. I've done some >> research into the topic and from what I understand this is not >> straightforward to take advantage of in Flink. It looks to be a fairly >> commonly requested feature based on the many StackOverflow posts and slack >> questions, and I noticed there is FLIP-186 which attempts to address this >> topic as well. >> >> Are there any upcoming plans to add this feature to a future Flink >> release? I believe it'd be super impactful for similar large scale jobs out >> there. I'd be interested in helping as well, but admittedly I'm relatively >> new to Flink. I poked around the code a bit, and it certainly did not seem >> like a straightforward addition, so it may be best handled by someone with >> more internal knowledge. >> >> Thanks, >> Tommy >> >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> Custom big data solutions >> Flink, Pinot, Solr, Elasticsearch >> >> >> >> > -------------------------- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > >