Re: Ratelimiting in the Flink Kafka connector

2019-02-11 Thread Lakshmi Gururaja Rao
I created a PR with the implementation described above — https://github.com/apache/flink/pull/7679. Please provide feedback :) Thanks Lakshmi On Thu, Feb 7, 2019 at 11:04 AM Lakshmi Gururaja Rao wrote: > Apologies for the delay in responding here. > > The idea of making the Ratelimiter config/c

Re: Ratelimiting in the Flink Kafka connector

2019-02-07 Thread Lakshmi Gururaja Rao
Apologies for the delay in responding here. The idea of making the Ratelimiter config/creation logic generic across connectors makes sense to me. In the approach that we used and tested internally, we essentially created a Guava RateLimiter

Re: Ratelimiting in the Flink Kafka connector

2019-02-01 Thread Becket Qin
Hi Thomas, Yes, adding a rate limiting operator in front of the sink would work for record rate limiting. Another thing I am thinking is that for local throttling, it seems that throttling in sources and sinks has some subtle differences. For example, consider both source and sink as HDFS. For so

Re: Ratelimiting in the Flink Kafka connector

2019-02-01 Thread Thomas Weise
Hi Becket, The throttling operator would suffer from the same issue of not being able to accurately count bytes. On the other hand, it can be used by composition w/o modifying existing operators. As for sinks, wouldn't an operator that adjusts the rate in front of the sink suffice? Thomas On

Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Becket Qin
Hi Thomas, Good point about counting bytes. It would be difficult to throttle the byte rate with the existing API. And it seems that for sinks we have to do that rate limiting in the sink implementation anyways. There are a few ways to do some abstraction, but maybe adding a RateLimiter is trivial

Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Thomas Weise
I initially thought of an approach similar to the collector idea, by overriding emitRecord in the fetcher. That makes counting the bytes difficult, because it's downstream of decoding. Another idea of solving this in a reusable way was to have a separate rate limiting operator chained downstream o

Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Ken Krugler
+1, and something I was planning to comment on in the Jira issue. Also, if rate limiting could effectively stop the stream, then this could be used solve a common data enrichment issue. Logically you want to pause one stream (typically the time series data being processed) while another stream

Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Becket Qin
Hi Jamie, Thanks for the explanation. That makes sense to me. I am wondering if there is a more general way to add a rate limiter to all the connecters rather than doing that for each individual one. For example, maybe we can have the rate limiting logic in the Collector / Output, thus all the con

Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Lakshmi Gururaja Rao
Thanks for adding more context @Jamie Grier . JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-11501. Thanks Lakshmi On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise wrote: > I think it would be reasonable to have a rate limiter option in the > consumer, given that others have a

Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Thomas Weise
I think it would be reasonable to have a rate limiter option in the consumer, given that others have also looked to solve this. I think for this and other optional features, it would be good to implement in a way that overrides are possible. Someone else may want to do the limiting differently, ta

Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Jamie Grier
I had the same reaction initially as some of the others on this thread -- which is "Use Kafka quotas".. I agree that in general a service should protect itself with it's own rate limiting rather than building it into clients like the FlinkKafkaConsumer. However, there are a few reasons we need to

Re: Ratelimiting in the Flink Kafka connector

2019-01-29 Thread Thomas Weise
It is preferred for the service to rate limit. The problem is that not all Kafka setups have that control enabled / support for it. Even when rate limiting was enabled, it may still be *nice* for the client to gracefully handle it. There was discussion in the past that we should not bloat the Kaf

Re: Ratelimiting in the Flink Kafka connector

2019-01-28 Thread Becket Qin
Hi Lakshmi, As Nagajun mentioned, you might want to configure quota on the Kafka broker side for your Flink connector client. Thanks, Jiangjie (Becket) Qin On Sat, Jan 26, 2019 at 10:44 AM Ning Shi wrote: > > We have a Flink job reading from Kafka (specifically it uses > > FlinkKafkaConsumer0

Re: Ratelimiting in the Flink Kafka connector

2019-01-25 Thread Ning Shi
> We have a Flink job reading from Kafka (specifically it uses > FlinkKafkaConsumer011). There are instances when the job is processing a > backlog and it ends up reading at a significantly high throughput and > degrades the underlying Kafka cluster. If there was a way to rate limit the > calls to

Re: Ratelimiting in the Flink Kafka connector

2019-01-25 Thread Nagarjun Guraja
Have you looked at Kafka quotas( https://kafka.apache.org/0110/documentation.html#design_quotas) to achieve rate limiting on the consumer side? In your flink app, you should be able to set the client.id and configure kafka to rate limit you. Regards, Nagarjun *Success is not final, failure is not