I also experience the same, as per the documentation **withMaxReadTime**
and **withMaxNumRecords** are mainly used for Demo purposes, so i guess is
beyond the scope of the current KafkaIO to behave as Bounded with offset
management or just something is missing in the current implementation
(Watermarking).



On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
andre.missag...@arquivei.com.br> wrote:

> Hello everyone,
>
> I need to do some batch processing that uses messages in a Kafka topic. So
> I tried the "withMaxReadTime" KafkaIO setting:
>
> ---
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "...")
> properties.setProperty("group.id", "mygroup")
> properties.setProperty("sasl.jaas.config", "...")
> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
> properties.setProperty("enable.auto.commit", "false")
>
> sc.customInput("Read From Kafka",
>   KafkaIO
>     .read[String, String]()
>     .withTopic("mytopic")
>     .withKeyDeserializer(classOf[StringDeserializer])
>     .withValueDeserializer(classOf[StringDeserializer])
>     .updateConsumerProperties(properties)
>     .withMaxReadTime(Duration.standardSeconds(20))
>     .withMaxNumRecords(1000000)
>     .commitOffsetsInFinalize()
>     .withoutMetadata()
> )
> .count.debug() // prints something between 10000 and 20000
> ---
> I can see that it was able to read the messages and process them. But in
> the end, no offset was commited:
>
> TOPIC                                        PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
> mytopic                                         0          0
> 3094751         3094751         -               -               -
>
> But it is a strange behavior: sometimes it commits the offset, sometimes
> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>
> Has anyone used Bounded KafkaIO before? is there anything I can do?
>
> Best Regards,
>
> --
> *André Badawi Missaglia*
> Data Engineer
> (16) 3509-5515 *|* www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>


-- 

JC

Reply via email to