[ https://issues.apache.org/jira/browse/FLINK-3264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336896#comment-17336896 ]
Flink Jira Bot commented on FLINK-3264: --------------------------------------- This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Add load shedding policy into Kafka Consumers > --------------------------------------------- > > Key: FLINK-3264 > URL: https://issues.apache.org/jira/browse/FLINK-3264 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Reporter: Robert Metzger > Priority: Major > Labels: stale-major > > There are situations when Flink's Kafka Consumer is not able to consume > everything produced into a topic, for example when one Flink instance is > subscribed to a busy Kafka topic (See user request: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-exceptions-killing-streaming-job-td4323.html > ) > I think we should allow users to control the behavior of the Kafka consumer > in those situations. > I had an offline discussion with [~StephanEwen] about this and we think that > the allowing users to pass a LoadSheddingPolicy to the KafkaConsumer would be > the best solution. > In the policy, users can define a frequency for the consumer to request the > latest offsets in the subscribed partitions (the requests can either be based > on time (every n ms) or on record count (every n'th record). Then, the policy > can decide to skip a certain amount of offsets (maybe even set to the latest > offset). > With the "offset skipping" approach, we'll avoid fetching records we can not > process anyways. > In the 0.9 consumer, there doesn't seem to be an API for requesting the > latest offset of a topicPartition. I'll ask on the Kafka ML whats the status > there. > With {{seek()}} we can fetch from any offset. > In the 0.8 SimpleConsumer, there is a method for requesting the offsets: > {code} > kafka.javaapi.OffsetRequest request = new > kafka.javaapi.OffsetRequest(requestInfo, > kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); > OffsetResponse response = > consumer.getOffsetsBefore(request); > {code} > The fetch offset is controlled within the {{LegacyFetcher}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)