[ 
https://issues.apache.org/jira/browse/FLINK-3264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336896#comment-17336896
 ] 

Flink Jira Bot commented on FLINK-3264:
---------------------------------------

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Add load shedding policy into Kafka Consumers
> ---------------------------------------------
>
>                 Key: FLINK-3264
>                 URL: https://issues.apache.org/jira/browse/FLINK-3264
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Robert Metzger
>            Priority: Major
>              Labels: stale-major
>
> There are situations when Flink's Kafka Consumer is not able to consume 
> everything produced into a topic, for example when one Flink instance is 
> subscribed to a busy Kafka topic (See user request: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-exceptions-killing-streaming-job-td4323.html
>  )
> I think we should allow users to control the behavior of the Kafka consumer 
> in those situations.
> I had an offline discussion with [~StephanEwen] about this and we think that 
> the allowing users to pass a LoadSheddingPolicy to the KafkaConsumer would be 
> the best solution.
> In the policy, users can define a frequency for the consumer to request the 
> latest offsets in the subscribed partitions (the requests can either be based 
> on time (every n ms) or on record count (every n'th record). Then, the policy 
> can decide to skip a certain amount of offsets (maybe even set to the latest 
> offset).
> With the "offset skipping" approach, we'll avoid fetching records we can not 
> process anyways.
> In the 0.9 consumer, there doesn't seem to be an API for requesting the 
> latest offset of a topicPartition. I'll ask on the Kafka ML whats the status 
> there.
> With {{seek()}} we can fetch from any offset.
> In the 0.8 SimpleConsumer, there is a method for requesting the offsets:
> {code}
> kafka.javaapi.OffsetRequest request = new 
> kafka.javaapi.OffsetRequest(requestInfo, 
> kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
>                       OffsetResponse response = 
> consumer.getOffsetsBefore(request);
> {code}
> The fetch offset is controlled within the {{LegacyFetcher}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to