> That comes with the additional constraints that Ken mentioned, correct?
It could break immediately in cases if a key comes through on a different
partition, or if the number of partitions happen to change? I'm concerned
about that for our use case as we don't have 100% control of the upstream
data source.

There is no way to avoid shuffling if the partitioning is unstable (except
for map-side join, which is a special case in case one side of the join
fits in memory).

Best,
D.

On Tue, Mar 7, 2023 at 9:16 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Tommy,
>
>
>
> While not coming up with a sure solution, I’ve got a number of idea on how
> to continue and shed light into the matter:
>
>
>
>    - With respect to diagnostics, have you enabled flame graph
>    (cluster-config.rest.flamegraph.enabled),
>       - It allows you to see the call tree of each task and where
>       dominantly time is spent
>       - That usually gives me quite some insight
>    - You mention serialization could be a problem:
>       - Which serialization are you using currently?
>       - I could imagine to use one the (almost) zero-copy type like
>       RowData
>          - I considered this once but didn’t try
>       - Nico published a nice comparison of the choices w/r to
>       serializers [1]
>    - Just for completeness: pipeline.object-reuse can cut down quite a
>    bit on GC cost adding the need to execute more discipline with object
>    mutation and caching un-serialized objects in arbitrary data structures
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> [1]
> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/
>
>
>
>
>
>
>
> *From:* Tommy May <tvma...@gmail.com>
> *Sent:* Tuesday, March 7, 2023 3:25 AM
> *To:* David Morávek <d...@apache.org>
> *Cc:* Ken Krugler <kkrugler_li...@transpac.com>; Flink User List <
> user@flink.apache.org>
> *Subject:* Re: Avoiding data shuffling when reading pre-partitioned data
> from Kafka
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Ken & David,
>
>
>
> Thanks for following up. I've responded to your questions below.
>
>
>
>  If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
>
>
>
> Unfortunately the number of keys in our case is huge, they're unique per
> handful of events.
>
>
>
> If your data are already pre-partitioned and the partitioning matches
> (hash partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.
>
>
>
> That comes with the additional constraints that Ken mentioned, correct? It
> could break immediately in cases if a key comes through on a different
> partition, or if the number of partitions happen to change? I'm concerned
> about that for our use case as we don't have 100% control of the upstream
> data source.
>
>
>
> I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?
>
>
>
> Yes, you are right that state access is also a limiting factor and some
> optimizations to limit that have helped quite a bit (both in our
> implementation and in using local SSDs for rocksdb). One other path we
> looked at is using memory-backed volumes for rocksdb, but ran into a
> limitation that we cannot configure Flink's process memory lower than the
> k8s container memory, leading to OOMs. More details at
> https://stackoverflow.com/questions/74118022/flink-pods-ooming-using-memory-backed-volume-with-k8s-operator
> .
>
>
>
> I don't have a dashboard currently to immediately point to data shuffling
> as the primary bottleneck, but I thought it could be a huge optimization if
> we can tell Flink to take advantage of the pre-partitioned datasource,
> given we're shuffling near 1 Gb/sec right now. I can see that the join is
> causing the backpressure on the sources though, and figured that network
> and state acces would be the two primary contributors there. Let me know if
> you have any good debugging tools to narrow in on this more.
>
>
>
> Thanks,
>
> Tommy
>
>
>
>
>
> On Mon, Mar 6, 2023 at 4:42 AM David Morávek <d...@apache.org> wrote:
>
> Using an operator state for a stateful join isn't great because it's meant
> to hold only a minimal state related to the operator (e.g., partition
> tracking).
>
>
>
> If your data are already pre-partitioned and the partitioning matches
> (hash partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.
>
>
>
> > What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
>
>
>
> I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?
>
>
>
> [1]
> https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-
>
>
>
> Best,
>
> D.
>
>
>
> On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
> Hi Tommy,
>
>
>
> To use stateful timers, you need to have a keyed stream, which gets tricky
> when you’re trying to avoid network traffic caused by the keyBy()
>
>
>
> If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
>
>
>
> It’s possible to calculate a composite key, based on the “real” key and a
> synthetic value, that will wind up on in the same slot where you’re doing
> this calculation.
>
>
>
> So that would let you create a keyed stream which would have
> serialization/deserialization cost, but wouldn’t actually go through the
> network stack.
>
>
>
> Since the composite key generation is deterministic, you can do the same
> thing on both streams, and join on the composite key.
>
>
>
> You’d want to cache the mapping from the real key to the synthetic value,
> to avoid doing this calculation for every record.
>
>
>
> If that sounds promising, lmk and I can post some code.
>
>
>
> — Ken
>
>
>
>
>
> On Mar 4, 2023, at 12:37 PM, Tommy May <tvma...@gmail.com> wrote:
>
>
>
> Hello Ken,
>
>
>
> Thanks for the quick response! That is an interesting workaround. In our
> case though we are using a CoProcessFunction with stateful timers. Is there
> a similar workaround path available in that case? The one possible way I
> could find required partitioning data in kafka in a very specific way
> to match what Flink's keyBy is doing, and that it'd have additional
> constraints to the method you described that would be difficult to handle
> in a prod environment where we don't have full control over the producers &
> input topics.
>
>
>
> Regarding the addition of a more flexible way to take advantage of
> pre-partitioned sources like in FLIP-186, would you suggest I forward this
> chain over to the dev Flink mailing list?
>
>
>
> Thanks,
>
> Tommy
>
>
>
>
>
>
>
> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
> Hi Tommy,
>
>
>
> I believe there is a way to make this work currently, but with lots of
> caveats and constraints.
>
>
>
> This assumes you want to avoid any network shuffle.
>
>
>
> 1. Both topics have names that return the same value for
> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
>
> 2. Both topics have the same number of partitions.
>
> 3. The parallelism of your join function exactly matches the number of
> partitions.
>
> 4. You can’t change any of the above without losing state.
>
> 5. You don’t need stateful timers.
>
>
>
> If the above is true, then you could use a CoFlatMapFunction and operator
> state to implement a stateful join.
>
>
>
> If it’s something like a left outer join without any state TTL or need to
> keep both sides in state, then it’s pretty easy.
>
>
>
> — Ken
>
>
>
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
> name to get the hashCode() result you need.
>
>
>
> On Mar 3, 2023, at 4:56 PM, Tommy May <tvma...@gmail.com> wrote:
>
>
>
> Hello,
>
>
>
> My team has a Flink streaming job that does a stateful join across two
> high throughput kafka topics. This results in a large amount of data ser/de
> and shuffling (about 1gb/s for context). We're running into a bottleneck on
> this shuffling step. We've attempted to optimize our flink configuration,
> join logic, scale out the kafka topics & flink job, and speed up state
> access. What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
>
>
>
> One idea we had to optimize this is to pre-partition the data in kafka on
> the same key that the join is happening on. This'll effectively reduce data
> shuffling to 0 and remove the bottleneck that we're seeing. I've done some
> research into the topic and from what I understand this is not
> straightforward to take advantage of in Flink. It looks to be a fairly
> commonly requested feature based on the many StackOverflow posts and slack
> questions, and I noticed there is FLIP-186 which attempts to address this
> topic as well.
>
>
>
> Are there any upcoming plans to add this feature to a future Flink
> release? I believe it'd be super impactful for similar large scale jobs out
> there. I'd be interested in helping as well, but admittedly I'm relatively
> new to Flink.  I poked around the code a bit, and it certainly did not seem
> like a straightforward addition, so it may be best handled by someone with
> more internal knowledge.
>
>
>
> Thanks,
>
> Tommy
>
>
>
> --------------------------
>
> Ken Krugler
>
> http://www.scaleunlimited.com
>
> Custom big data solutions
>
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>
>
>
>
>
>
> --------------------------
>
> Ken Krugler
>
> http://www.scaleunlimited.com
>
> Custom big data solutions
>
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to