Thank you Gordon and Max,

Thank you Gordon, that explains the behaviour a bit better to me. I am now adding the timestamp to the group ID and that is a good workaround for now. The "smallest" option is unfortunately not available in this version of the FlinkKafkaConsumer class.

Cheers,
Kevin


On 28.07.2016 10:39, Maximilian Michels wrote:
Hi Kevin,

You need to use properties.setProperty("auto.offset.reset",
"smallest") for Kafka 9 to start from the smallest offset. Note, that
in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
"earliest") to achieve the same behavior.

Kafka keeps track of the offsets per group id. If you have already
read from a topic with a certain group id and want to restart from the
smallest offset available, you need to generate a unique group id.

Cheers,
Max

On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch> wrote:
Hi,

I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I
am using Flink 1.0.3.

These are my properties:

val properties = new Properties()
properties.setProperty("bootstrap.servers", config.urlKafka)
properties.setProperty("group.id", COLLECTOR_NAME)
properties.setProperty("auto.offset.reset", *"earliest"*)

According to the new consumer API of Kafka, this should result in the
following:

/auto.offset.reset: * smallest : automatically reset the offset to the
smallest offset/ (source:
https://kafka.apache.org/documentation.html#newconsumerapi)

However, it starts from the latest item in my topic. Is this a bug or am I
doing something wrong?

Regards,
Kevin


Reply via email to