Hello everyone, I need to do some batch processing that uses messages in a Kafka topic. So I tried the "withMaxReadTime" KafkaIO setting:
--- val properties = new Properties() properties.setProperty("bootstrap.servers", "...") properties.setProperty("group.id", "mygroup") properties.setProperty("sasl.jaas.config", "...") properties.setProperty("security.protocol", "SASL_PLAINTEXT") properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") properties.setProperty("enable.auto.commit", "false") sc.customInput("Read From Kafka", KafkaIO .read[String, String]() .withTopic("mytopic") .withKeyDeserializer(classOf[StringDeserializer]) .withValueDeserializer(classOf[StringDeserializer]) .updateConsumerProperties(properties) .withMaxReadTime(Duration.standardSeconds(20)) .withMaxNumRecords(1000000) .commitOffsetsInFinalize() .withoutMetadata() ) .count.debug() // prints something between 10000 and 20000 --- I can see that it was able to read the messages and process them. But in the end, no offset was commited: TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mytopic 0 0 3094751 3094751 - - - But it is a strange behavior: sometimes it commits the offset, sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs. Has anyone used Bounded KafkaIO before? is there anything I can do? Best Regards, -- *André Badawi Missaglia* Data Engineer (16) 3509-5515 *|* www.arquivei.com.br <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> [image: Arquivei.com.br – Inteligência em Notas Fiscais] <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> [image: Google seleciona Arquivei para imersão e mentoria no Vale do Silício] <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> <https://www.facebook.com/arquivei> <https://www.linkedin.com/company/arquivei> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>