Oh, the generic bounded source wrapper over an unbounded source does not seem to call finalize when it is done with a split. I think it should.
Could you file a bug for the wrapper? Mean while, this check could be added sanity checks in KafkaIO.Read.expand(). On Wed, Jan 9, 2019 at 10:37 AM André Missaglia < andre.missag...@arquivei.com.br> wrote: > Hi Juan, > > After researching a bit, I found this issue, which is open since 2017: > https://issues.apache.org/jira/browse/BEAM-2185 > > I guess KafkaIO isn't intended to provide a bounded source. Maybe I should > write my own code that fetches messages from kafka, even if it means giving > up on some processing guarantees from beam... > > > Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarc...@gmail.com> > escreveu: > >> Just for you to have a look where this happen: >> >> >> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584 >> >> Cheers >> >> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com> >> wrote: >> >>> I also experience the same, as per the documentation **withMaxReadTime** >>> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess >>> is beyond the scope of the current KafkaIO to behave as Bounded with offset >>> management or just something is missing in the current implementation >>> (Watermarking). >>> >>> >>> >>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia < >>> andre.missag...@arquivei.com.br> wrote: >>> >>>> Hello everyone, >>>> >>>> I need to do some batch processing that uses messages in a Kafka topic. >>>> So I tried the "withMaxReadTime" KafkaIO setting: >>>> >>>> --- >>>> val properties = new Properties() >>>> properties.setProperty("bootstrap.servers", "...") >>>> properties.setProperty("group.id", "mygroup") >>>> properties.setProperty("sasl.jaas.config", "...") >>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT") >>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") >>>> properties.setProperty("enable.auto.commit", "false") >>>> >>>> sc.customInput("Read From Kafka", >>>> KafkaIO >>>> .read[String, String]() >>>> .withTopic("mytopic") >>>> .withKeyDeserializer(classOf[StringDeserializer]) >>>> .withValueDeserializer(classOf[StringDeserializer]) >>>> .updateConsumerProperties(properties) >>>> .withMaxReadTime(Duration.standardSeconds(20)) >>>> .withMaxNumRecords(1000000) >>>> .commitOffsetsInFinalize() >>>> .withoutMetadata() >>>> ) >>>> .count.debug() // prints something between 10000 and 20000 >>>> --- >>>> I can see that it was able to read the messages and process them. But >>>> in the end, no offset was commited: >>>> >>>> TOPIC PARTITION CURRENT-OFFSET >>>> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID >>>> mytopic 0 >>>> 0 3094751 3094751 - >>>> - - >>>> >>>> But it is a strange behavior: sometimes it commits the offset, >>>> sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs. >>>> >>>> Has anyone used Bounded KafkaIO before? is there anything I can do? >>>> >>>> Best Regards, >>>> >>>> -- >>>> *André Badawi Missaglia* >>>> Data Engineer >>>> (16) 3509-5515 *|* www.arquivei.com.br >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>> Silício] >>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>> <https://www.facebook.com/arquivei> >>>> <https://www.linkedin.com/company/arquivei> >>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> >>>> >>> >>> >>> -- >>> >>> JC >>> >>> >> >> -- >> >> JC >> >> > > -- > *André Badawi Missaglia* > Data Engineer > (16) 3509-5515 *|* www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Arquivei.com.br – Inteligência em Notas Fiscais] > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Google seleciona Arquivei para imersão e mentoria no Vale do > Silício] > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> >