> On Mar 20, 2019, at 6:49 PM, Seed Zeng <seed.z...@klaviyo.com> wrote: > > Hey Andrey and Ken, > Sorry about the late reply. I might not have been clear in my question > The performance of writing to Cassandra is the same in both cases, only that > the source rate was higher in the case of the async function is present.
OK, I was confused by what you’d originally written... >>> Job 1 is backpressured because Cassandra cannot handle all the writes and >>> eventually slows down the source rate to 6.5k/s. >>> Job 2 is slightly backpressured but was able to run at 14k/s. If the source rate is _temporarily_ higher, then that maybe makes sense, as the async function will be able to buffer up to the configured capacity. E.g. in the documentation example <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100); The capacity is 100 (which is also the default, if you don’t specify it) > Something is "buffering" and not propagating backpressure to slow down the > source speed from Kafka. > > In our use case, we prefer the backpressure to slow down the source so that > the write to Cassandra is not delayed while the source is consuming fast. You can use a smaller capacity to reduce the impact, but that could obviously impact the performance whatever your using the async function to parallelize. Regards, — Ken > On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin <and...@ververica.com > <mailto:and...@ververica.com>> wrote: > Hi Seed, > > Sorry for confusion, I see now it is separate. Back pressure should still be > created because internal async queue has capacity > but not sure about reporting problem, Ken and Till probably have better idea. > > As for consumption speed up, async operator creates another thread to collect > the result and Cassandra sink probably uses that thread to write data. > This might parallelize and pipeline previous steps like Kafka fetching and > Cassandra IO but I am also not sure about this explanation. > > Best, > Andrey > > > On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: > Hi Seed, > > I was assuming the Cassandra sink was separate from and after your async > function. > > I was trying to come up for an explanation as to why adding the async > function would improve your performance. > > The only very unlikely reason I thought of was that the async function > somehow caused data arriving at the sink to be more “batchy”, which (if the > Cassandra sink had an “every x seconds do a write” batch mode) could improve > performance. > > — Ken > >> On Mar 19, 2019, at 11:35 AM, Seed Zeng <seed.z...@klaviyo.com >> <mailto:seed.z...@klaviyo.com>> wrote: >> >> Hi Ken and Andrey, >> >> Thanks for the response. I think there is a confusion that the writes to >> Cassandra are happening within the Async function. >> In my test, the async function is just a pass-through without doing any work. >> >> So any Cassandra related batching or buffering should not be the cause for >> this. >> >> Thanks, >> >> Seed >> >> On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> Hi Seed, >> >> It’s a known issue that Flink doesn’t report back pressure properly for >> AsyncFunctions, due to how it monitors the output collector to gather back >> pressure statistics. >> >> But that wouldn’t explain how you get a faster processing with the >> AsyncFunction inserted into your workflow. >> >> I haven’t looked at how the Cassandra sink handles batching, but if the >> AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then >> that’s one (serious hand waving) explanation. >> >> — Ken >> >>> On Mar 18, 2019, at 7:48 PM, Seed Zeng <seed.z...@klaviyo.com >>> <mailto:seed.z...@klaviyo.com>> wrote: >>> >>> Flink Version - 1.6.1 >>> >>> In our application, we consume from Kafka and sink to Cassandra in the end. >>> We are trying to introduce a custom async function in front of the Sink to >>> carry out some customized operations. In our testing, it appears that the >>> Async function is not generating backpressure to slow down our Kafka Source >>> when Cassandra becomes unhappy. Essentially compared to an almost identical >>> job where the only difference is the lack of the Async function, Kafka >>> source consumption speed is much higher under the same settings and >>> identical Cassandra cluster. The experiment is like this. >>> >>> Job 1 - without async function in front of Cassandra >>> Job 2 - with async function in front of Cassandra >>> >>> Job 1 is backpressured because Cassandra cannot handle all the writes and >>> eventually slows down the source rate to 6.5k/s. >>> Job 2 is slightly backpressured but was able to run at 14k/s. >>> >>> Is the AsyncFunction somehow not reporting the backpressure correctly? >>> >>> Thanks, >>> Seed >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra