Don’t you think that we could have some race condition there since, according to initial issue description, sometimes offset was committed and sometimes not?
> On 9 Jan 2019, at 19:48, Raghu Angadi <ang...@gmail.com> wrote: > > Oh, the generic bounded source wrapper over an unbounded source does not seem > to call finalize when it is done with a split. I think it should. > > Could you file a bug for the wrapper? > Mean while, this check could be added sanity checks in KafkaIO.Read.expand(). > > > > On Wed, Jan 9, 2019 at 10:37 AM André Missaglia > <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> > wrote: > Hi Juan, > > After researching a bit, I found this issue, which is open since 2017: > https://issues.apache.org/jira/browse/BEAM-2185 > <https://issues.apache.org/jira/browse/BEAM-2185> > > I guess KafkaIO isn't intended to provide a bounded source. Maybe I should > write my own code that fetches messages from kafka, even if it means giving > up on some processing guarantees from beam... > > > Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarc...@gmail.com > <mailto:jcgarc...@gmail.com>> escreveu: > Just for you to have a look where this happen: > > https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584 > > <https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584> > > Cheers > > On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com > <mailto:jcgarc...@gmail.com>> wrote: > I also experience the same, as per the documentation *withMaxReadTime* and > *withMaxNumRecords* are mainly used for Demo purposes, so i guess is beyond > the scope of the current KafkaIO to behave as Bounded with offset management > or just something is missing in the current implementation (Watermarking). > > > > On Wed, Jan 9, 2019 at 2:28 PM André Missaglia > <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> > wrote: > Hello everyone, > > I need to do some batch processing that uses messages in a Kafka topic. So I > tried the "withMaxReadTime" KafkaIO setting: > > --- > val properties = new Properties() > properties.setProperty("bootstrap.servers", "...") > properties.setProperty("group.id <http://group.id/>", "mygroup") > properties.setProperty("sasl.jaas.config", "...") > properties.setProperty("security.protocol", "SASL_PLAINTEXT") > properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") > properties.setProperty("enable.auto.commit", "false") > > sc.customInput("Read From Kafka", > KafkaIO > .read[String, String]() > .withTopic("mytopic") > .withKeyDeserializer(classOf[StringDeserializer]) > .withValueDeserializer(classOf[StringDeserializer]) > .updateConsumerProperties(properties) > .withMaxReadTime(Duration.standardSeconds(20)) > .withMaxNumRecords(1000000) > .commitOffsetsInFinalize() > .withoutMetadata() > ) > .count.debug() // prints something between 10000 and 20000 > --- > I can see that it was able to read the messages and process them. But in the > end, no offset was commited: > > TOPIC PARTITION CURRENT-OFFSET > LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID > mytopic 0 0 > 3094751 3094751 - - - > > But it is a strange behavior: sometimes it commits the offset, sometimes not. > I'm not sure if it is a bug, or I'm using the wrong configs. > > Has anyone used Bounded KafkaIO before? is there anything I can do? > > Best Regards, > > -- > André Badawi Missaglia > Data Engineer > (16) 3509-5515 | www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> > > -- > > JC > > > > -- > > JC > > > > -- > André Badawi Missaglia > Data Engineer > (16) 3509-5515 | www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>