> That comes with the additional constraints that Ken mentioned, correct? It could break immediately in cases if a key comes through on a different partition, or if the number of partitions happen to change? I'm concerned about that for our use case as we don't have 100% control of the upstream data source.
There is no way to avoid shuffling if the partitioning is unstable (except for map-side join, which is a special case in case one side of the join fits in memory). Best, D. On Tue, Mar 7, 2023 at 9:16 AM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi Tommy, > > > > While not coming up with a sure solution, I’ve got a number of idea on how > to continue and shed light into the matter: > > > > - With respect to diagnostics, have you enabled flame graph > (cluster-config.rest.flamegraph.enabled), > - It allows you to see the call tree of each task and where > dominantly time is spent > - That usually gives me quite some insight > - You mention serialization could be a problem: > - Which serialization are you using currently? > - I could imagine to use one the (almost) zero-copy type like > RowData > - I considered this once but didn’t try > - Nico published a nice comparison of the choices w/r to > serializers [1] > - Just for completeness: pipeline.object-reuse can cut down quite a > bit on GC cost adding the need to execute more discipline with object > mutation and caching un-serialized objects in arbitrary data structures > > > > Hope this helps > > > > Thias > > > > > > > > > > [1] > https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/ > > > > > > > > *From:* Tommy May <tvma...@gmail.com> > *Sent:* Tuesday, March 7, 2023 3:25 AM > *To:* David Morávek <d...@apache.org> > *Cc:* Ken Krugler <kkrugler_li...@transpac.com>; Flink User List < > user@flink.apache.org> > *Subject:* Re: Avoiding data shuffling when reading pre-partitioned data > from Kafka > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi Ken & David, > > > > Thanks for following up. I've responded to your questions below. > > > > If the number of unique keys isn’t huge, I could think of yet another > helicopter stunt that you could try :) > > > > Unfortunately the number of keys in our case is huge, they're unique per > handful of events. > > > > If your data are already pre-partitioned and the partitioning matches > (hash partitioning on the JAVA representation of the key yielded by the > KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the > shuffle. > > > > That comes with the additional constraints that Ken mentioned, correct? It > could break immediately in cases if a key comes through on a different > partition, or if the number of partitions happen to change? I'm concerned > about that for our use case as we don't have 100% control of the upstream > data source. > > > > I feel you'd be blocked by the state access downstream (with RocksDB). Are > you sure it isn't the case? > > > > Yes, you are right that state access is also a limiting factor and some > optimizations to limit that have helped quite a bit (both in our > implementation and in using local SSDs for rocksdb). One other path we > looked at is using memory-backed volumes for rocksdb, but ran into a > limitation that we cannot configure Flink's process memory lower than the > k8s container memory, leading to OOMs. More details at > https://stackoverflow.com/questions/74118022/flink-pods-ooming-using-memory-backed-volume-with-k8s-operator > . > > > > I don't have a dashboard currently to immediately point to data shuffling > as the primary bottleneck, but I thought it could be a huge optimization if > we can tell Flink to take advantage of the pre-partitioned datasource, > given we're shuffling near 1 Gb/sec right now. I can see that the join is > causing the backpressure on the sources though, and figured that network > and state acces would be the two primary contributors there. Let me know if > you have any good debugging tools to narrow in on this more. > > > > Thanks, > > Tommy > > > > > > On Mon, Mar 6, 2023 at 4:42 AM David Morávek <d...@apache.org> wrote: > > Using an operator state for a stateful join isn't great because it's meant > to hold only a minimal state related to the operator (e.g., partition > tracking). > > > > If your data are already pre-partitioned and the partitioning matches > (hash partitioning on the JAVA representation of the key yielded by the > KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the > shuffle. > > > > > What we see is that the join step causes backpressure on the kafka > sources and lag slowly starts to accumulate. > > > > I feel you'd be blocked by the state access downstream (with RocksDB). Are > you sure it isn't the case? > > > > [1] > https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector- > > > > Best, > > D. > > > > On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > > Hi Tommy, > > > > To use stateful timers, you need to have a keyed stream, which gets tricky > when you’re trying to avoid network traffic caused by the keyBy() > > > > If the number of unique keys isn’t huge, I could think of yet another > helicopter stunt that you could try :) > > > > It’s possible to calculate a composite key, based on the “real” key and a > synthetic value, that will wind up on in the same slot where you’re doing > this calculation. > > > > So that would let you create a keyed stream which would have > serialization/deserialization cost, but wouldn’t actually go through the > network stack. > > > > Since the composite key generation is deterministic, you can do the same > thing on both streams, and join on the composite key. > > > > You’d want to cache the mapping from the real key to the synthetic value, > to avoid doing this calculation for every record. > > > > If that sounds promising, lmk and I can post some code. > > > > — Ken > > > > > > On Mar 4, 2023, at 12:37 PM, Tommy May <tvma...@gmail.com> wrote: > > > > Hello Ken, > > > > Thanks for the quick response! That is an interesting workaround. In our > case though we are using a CoProcessFunction with stateful timers. Is there > a similar workaround path available in that case? The one possible way I > could find required partitioning data in kafka in a very specific way > to match what Flink's keyBy is doing, and that it'd have additional > constraints to the method you described that would be difficult to handle > in a prod environment where we don't have full control over the producers & > input topics. > > > > Regarding the addition of a more flexible way to take advantage of > pre-partitioned sources like in FLIP-186, would you suggest I forward this > chain over to the dev Flink mailing list? > > > > Thanks, > > Tommy > > > > > > > > On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > > Hi Tommy, > > > > I believe there is a way to make this work currently, but with lots of > caveats and constraints. > > > > This assumes you want to avoid any network shuffle. > > > > 1. Both topics have names that return the same value for > ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism. > > 2. Both topics have the same number of partitions. > > 3. The parallelism of your join function exactly matches the number of > partitions. > > 4. You can’t change any of the above without losing state. > > 5. You don’t need stateful timers. > > > > If the above is true, then you could use a CoFlatMapFunction and operator > state to implement a stateful join. > > > > If it’s something like a left outer join without any state TTL or need to > keep both sides in state, then it’s pretty easy. > > > > — Ken > > > > PS - it’s pretty easy to figure out a “-xxx” value to append to a topic > name to get the hashCode() result you need. > > > > On Mar 3, 2023, at 4:56 PM, Tommy May <tvma...@gmail.com> wrote: > > > > Hello, > > > > My team has a Flink streaming job that does a stateful join across two > high throughput kafka topics. This results in a large amount of data ser/de > and shuffling (about 1gb/s for context). We're running into a bottleneck on > this shuffling step. We've attempted to optimize our flink configuration, > join logic, scale out the kafka topics & flink job, and speed up state > access. What we see is that the join step causes backpressure on the kafka > sources and lag slowly starts to accumulate. > > > > One idea we had to optimize this is to pre-partition the data in kafka on > the same key that the join is happening on. This'll effectively reduce data > shuffling to 0 and remove the bottleneck that we're seeing. I've done some > research into the topic and from what I understand this is not > straightforward to take advantage of in Flink. It looks to be a fairly > commonly requested feature based on the many StackOverflow posts and slack > questions, and I noticed there is FLIP-186 which attempts to address this > topic as well. > > > > Are there any upcoming plans to add this feature to a future Flink > release? I believe it'd be super impactful for similar large scale jobs out > there. I'd be interested in helping as well, but admittedly I'm relatively > new to Flink. I poked around the code a bit, and it certainly did not seem > like a straightforward addition, so it may be best handled by someone with > more internal knowledge. > > > > Thanks, > > Tommy > > > > -------------------------- > > Ken Krugler > > http://www.scaleunlimited.com > > Custom big data solutions > > Flink, Pinot, Solr, Elasticsearch > > > > > > > > > > -------------------------- > > Ken Krugler > > http://www.scaleunlimited.com > > Custom big data solutions > > Flink, Pinot, Solr, Elasticsearch > > > > > > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >