I also experience the same, as per the documentation **withMaxReadTime** and **withMaxNumRecords** are mainly used for Demo purposes, so i guess is beyond the scope of the current KafkaIO to behave as Bounded with offset management or just something is missing in the current implementation (Watermarking).
On Wed, Jan 9, 2019 at 2:28 PM André Missaglia < andre.missag...@arquivei.com.br> wrote: > Hello everyone, > > I need to do some batch processing that uses messages in a Kafka topic. So > I tried the "withMaxReadTime" KafkaIO setting: > > --- > val properties = new Properties() > properties.setProperty("bootstrap.servers", "...") > properties.setProperty("group.id", "mygroup") > properties.setProperty("sasl.jaas.config", "...") > properties.setProperty("security.protocol", "SASL_PLAINTEXT") > properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") > properties.setProperty("enable.auto.commit", "false") > > sc.customInput("Read From Kafka", > KafkaIO > .read[String, String]() > .withTopic("mytopic") > .withKeyDeserializer(classOf[StringDeserializer]) > .withValueDeserializer(classOf[StringDeserializer]) > .updateConsumerProperties(properties) > .withMaxReadTime(Duration.standardSeconds(20)) > .withMaxNumRecords(1000000) > .commitOffsetsInFinalize() > .withoutMetadata() > ) > .count.debug() // prints something between 10000 and 20000 > --- > I can see that it was able to read the messages and process them. But in > the end, no offset was commited: > > TOPIC PARTITION CURRENT-OFFSET > LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID > mytopic 0 0 > 3094751 3094751 - - - > > But it is a strange behavior: sometimes it commits the offset, sometimes > not. I'm not sure if it is a bug, or I'm using the wrong configs. > > Has anyone used Bounded KafkaIO before? is there anything I can do? > > Best Regards, > > -- > *André Badawi Missaglia* > Data Engineer > (16) 3509-5515 *|* www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Arquivei.com.br – Inteligência em Notas Fiscais] > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Google seleciona Arquivei para imersão e mentoria no Vale do > Silício] > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> > -- JC