Hello everyone,

I need to do some batch processing that uses messages in a Kafka topic. So
I tried the "withMaxReadTime" KafkaIO setting:

---
val properties = new Properties()
properties.setProperty("bootstrap.servers", "...")
properties.setProperty("group.id", "mygroup")
properties.setProperty("sasl.jaas.config", "...")
properties.setProperty("security.protocol", "SASL_PLAINTEXT")
properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
properties.setProperty("enable.auto.commit", "false")

sc.customInput("Read From Kafka",
  KafkaIO
    .read[String, String]()
    .withTopic("mytopic")
    .withKeyDeserializer(classOf[StringDeserializer])
    .withValueDeserializer(classOf[StringDeserializer])
    .updateConsumerProperties(properties)
    .withMaxReadTime(Duration.standardSeconds(20))
    .withMaxNumRecords(1000000)
    .commitOffsetsInFinalize()
    .withoutMetadata()
)
.count.debug() // prints something between 10000 and 20000
---
I can see that it was able to read the messages and process them. But in
the end, no offset was commited:

TOPIC                                        PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
mytopic                                         0          0
3094751         3094751         -               -               -

But it is a strange behavior: sometimes it commits the offset, sometimes
not. I'm not sure if it is a bug, or I'm using the wrong configs.

Has anyone used Bounded KafkaIO before? is there anything I can do?

Best Regards,

-- 
*André Badawi Missaglia*
Data Engineer
(16) 3509-5515 *|* www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>

Reply via email to