Don’t you think that we could have some race condition there since, according 
to initial issue description, sometimes offset was committed and sometimes not?


> On 9 Jan 2019, at 19:48, Raghu Angadi <ang...@gmail.com> wrote:
> 
> Oh, the generic bounded source wrapper over an unbounded source does not seem 
> to call finalize when it is done with a split. I think it should.
> 
> Could you file a bug for the wrapper? 
> Mean while, this check could be added sanity checks in KafkaIO.Read.expand().
> 
> 
> 
> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia 
> <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> 
> wrote:
> Hi Juan,
> 
> After researching a bit, I found this issue, which is open since 2017: 
> https://issues.apache.org/jira/browse/BEAM-2185 
> <https://issues.apache.org/jira/browse/BEAM-2185>
> 
> I guess KafkaIO isn't intended to provide a bounded source. Maybe I should 
> write my own code that fetches messages from kafka, even if it means giving 
> up on some processing guarantees from beam...
> 
> 
> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarc...@gmail.com 
> <mailto:jcgarc...@gmail.com>> escreveu:
> Just for you to have a look where this happen:
> 
> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>  
> <https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584>
> 
> Cheers
> 
> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com 
> <mailto:jcgarc...@gmail.com>> wrote:
> I also experience the same, as per the documentation *withMaxReadTime* and 
> *withMaxNumRecords* are mainly used for Demo purposes, so i guess is beyond 
> the scope of the current KafkaIO to behave as Bounded with offset management 
> or just something is missing in the current implementation (Watermarking).
> 
> 
> 
> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia 
> <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> 
> wrote:
> Hello everyone,
> 
> I need to do some batch processing that uses messages in a Kafka topic. So I 
> tried the "withMaxReadTime" KafkaIO setting:
> 
> ---
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "...")
> properties.setProperty("group.id <http://group.id/>", "mygroup")
> properties.setProperty("sasl.jaas.config", "...")
> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
> properties.setProperty("enable.auto.commit", "false")
> 
> sc.customInput("Read From Kafka",
>   KafkaIO
>     .read[String, String]()
>     .withTopic("mytopic")
>     .withKeyDeserializer(classOf[StringDeserializer])
>     .withValueDeserializer(classOf[StringDeserializer])
>     .updateConsumerProperties(properties)
>     .withMaxReadTime(Duration.standardSeconds(20))
>     .withMaxNumRecords(1000000)
>     .commitOffsetsInFinalize()
>     .withoutMetadata()
> )
> .count.debug() // prints something between 10000 and 20000
> ---
> I can see that it was able to read the messages and process them. But in the 
> end, no offset was commited:
> 
> TOPIC                                        PARTITION  CURRENT-OFFSET  
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
> mytopic                                         0          0               
> 3094751         3094751         -               -               -
> 
> But it is a strange behavior: sometimes it commits the offset, sometimes not. 
> I'm not sure if it is a bug, or I'm using the wrong configs.
> 
> Has anyone used Bounded KafkaIO before? is there anything I can do?
> 
> Best Regards,
> 
> -- 
> André Badawi Missaglia
> Data Engineer
> (16) 3509-5515 | www.arquivei.com.br 
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>  <https://www.facebook.com/arquivei>  
> <https://www.linkedin.com/company/arquivei>  
> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
> 
> -- 
> 
> JC 
> 
> 
> 
> -- 
> 
> JC 
> 
> 
> 
> -- 
> André Badawi Missaglia
> Data Engineer
> (16) 3509-5515 | www.arquivei.com.br 
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>  <https://www.facebook.com/arquivei>  
> <https://www.linkedin.com/company/arquivei>  
> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>

Reply via email to