> On Mar 20, 2019, at 6:49 PM, Seed Zeng <seed.z...@klaviyo.com> wrote:
> 
> Hey Andrey and Ken,
> Sorry about the late reply. I might not have been clear in my question
> The performance of writing to Cassandra is the same in both cases, only that 
> the source rate was higher in the case of the async function is present. 

OK, I was confused by what you’d originally written...

>>> Job 1 is backpressured because Cassandra cannot handle all the writes and 
>>> eventually slows down the source rate to 6.5k/s. 
>>> Job 2 is slightly backpressured but was able to run at 14k/s.

If the source rate is _temporarily_ higher, then that maybe makes sense, as the 
async function will be able to buffer up to the configured capacity.

E.g. in the documentation example 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api>

AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);

The capacity is 100 (which is also the default, if you don’t specify it)

> Something is "buffering" and not propagating backpressure to slow down the 
> source speed from Kafka.
> 
> In our use case, we prefer the backpressure to slow down the source so that 
> the write to Cassandra is not delayed while the source is consuming fast.

You can use a smaller capacity to reduce the impact, but that could obviously 
impact the performance whatever your using the async function to parallelize.

Regards,

— Ken

> On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin <and...@ververica.com 
> <mailto:and...@ververica.com>> wrote:
> Hi Seed,
> 
> Sorry for confusion, I see now it is separate. Back pressure should still be 
> created because internal async queue has capacity 
> but not sure about reporting problem, Ken and Till probably have better idea.
> 
> As for consumption speed up, async operator creates another thread to collect 
> the result and Cassandra sink probably uses that thread to write data.
> This might parallelize and pipeline previous steps like Kafka fetching and 
> Cassandra IO but I am also not sure about this explanation.
> 
> Best,
> Andrey
> 
> 
> On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Seed,
> 
> I was assuming the Cassandra sink was separate from and after your async 
> function.
> 
> I was trying to come up for an explanation as to why adding the async 
> function would improve your performance.
> 
> The only very unlikely reason I thought of was that the async function 
> somehow caused data arriving at the sink to be more “batchy”, which (if the 
> Cassandra sink had an “every x seconds do a write” batch mode) could improve 
> performance.
> 
> — Ken
> 
>> On Mar 19, 2019, at 11:35 AM, Seed Zeng <seed.z...@klaviyo.com 
>> <mailto:seed.z...@klaviyo.com>> wrote:
>> 
>> Hi Ken and Andrey,
>> 
>> Thanks for the response. I think there is a confusion that the writes to 
>> Cassandra are happening within the Async function. 
>> In my test, the async function is just a pass-through without doing any work.
>> 
>> So any Cassandra related batching or buffering should not be the cause for 
>> this.
>> 
>> Thanks,
>> 
>> Seed
>> 
>> On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>> wrote:
>> Hi Seed,
>> 
>> It’s a known issue that Flink doesn’t report back pressure properly for 
>> AsyncFunctions, due to how it monitors the output collector to gather back 
>> pressure statistics.
>> 
>> But that wouldn’t explain how you get a faster processing with the 
>> AsyncFunction inserted into your workflow.
>> 
>> I haven’t looked at how the Cassandra sink handles batching, but if the 
>> AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then 
>> that’s one (serious hand waving) explanation.
>> 
>> — Ken
>> 
>>> On Mar 18, 2019, at 7:48 PM, Seed Zeng <seed.z...@klaviyo.com 
>>> <mailto:seed.z...@klaviyo.com>> wrote:
>>> 
>>> Flink Version - 1.6.1
>>> 
>>> In our application, we consume from Kafka and sink to Cassandra in the end. 
>>> We are trying to introduce a custom async function in front of the Sink to 
>>> carry out some customized operations. In our testing, it appears that the 
>>> Async function is not generating backpressure to slow down our Kafka Source 
>>> when Cassandra becomes unhappy. Essentially compared to an almost identical 
>>> job where the only difference is the lack of the Async function, Kafka 
>>> source consumption speed is much higher under the same settings and 
>>> identical Cassandra cluster. The experiment is like this.
>>> 
>>> Job 1 - without async function in front of Cassandra
>>> Job 2 - with async function in front of Cassandra
>>> 
>>> Job 1 is backpressured because Cassandra cannot handle all the writes and 
>>> eventually slows down the source rate to 6.5k/s. 
>>> Job 2 is slightly backpressured but was able to run at 14k/s.
>>> 
>>> Is the AsyncFunction somehow not reporting the backpressure correctly?
>>> 
>>> Thanks,
>>> Seed
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to