Oh, the generic bounded source wrapper over an unbounded source does not
seem to call finalize when it is done with a split. I think it should.

Could you file a bug for the wrapper?
Mean while, this check could be added sanity checks in
KafkaIO.Read.expand().



On Wed, Jan 9, 2019 at 10:37 AM André Missaglia <
andre.missag...@arquivei.com.br> wrote:

> Hi Juan,
>
> After researching a bit, I found this issue, which is open since 2017:
> https://issues.apache.org/jira/browse/BEAM-2185
>
> I guess KafkaIO isn't intended to provide a bounded source. Maybe I should
> write my own code that fetches messages from kafka, even if it means giving
> up on some processing guarantees from beam...
>
>
> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarc...@gmail.com>
> escreveu:
>
>> Just for you to have a look where this happen:
>>
>>
>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>>
>> Cheers
>>
>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com>
>> wrote:
>>
>>> I also experience the same, as per the documentation **withMaxReadTime**
>>> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess
>>> is beyond the scope of the current KafkaIO to behave as Bounded with offset
>>> management or just something is missing in the current implementation
>>> (Watermarking).
>>>
>>>
>>>
>>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
>>> andre.missag...@arquivei.com.br> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> I need to do some batch processing that uses messages in a Kafka topic.
>>>> So I tried the "withMaxReadTime" KafkaIO setting:
>>>>
>>>> ---
>>>> val properties = new Properties()
>>>> properties.setProperty("bootstrap.servers", "...")
>>>> properties.setProperty("group.id", "mygroup")
>>>> properties.setProperty("sasl.jaas.config", "...")
>>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>>>> properties.setProperty("enable.auto.commit", "false")
>>>>
>>>> sc.customInput("Read From Kafka",
>>>>   KafkaIO
>>>>     .read[String, String]()
>>>>     .withTopic("mytopic")
>>>>     .withKeyDeserializer(classOf[StringDeserializer])
>>>>     .withValueDeserializer(classOf[StringDeserializer])
>>>>     .updateConsumerProperties(properties)
>>>>     .withMaxReadTime(Duration.standardSeconds(20))
>>>>     .withMaxNumRecords(1000000)
>>>>     .commitOffsetsInFinalize()
>>>>     .withoutMetadata()
>>>> )
>>>> .count.debug() // prints something between 10000 and 20000
>>>> ---
>>>> I can see that it was able to read the messages and process them. But
>>>> in the end, no offset was commited:
>>>>
>>>> TOPIC                                        PARTITION  CURRENT-OFFSET
>>>> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>>>> mytopic                                         0
>>>> 0               3094751         3094751         -
>>>> -               -
>>>>
>>>> But it is a strange behavior: sometimes it commits the offset,
>>>> sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>>>
>>>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>>>
>>>> Best Regards,
>>>>
>>>> --
>>>> *André Badawi Missaglia*
>>>> Data Engineer
>>>> (16) 3509-5515 *|* www.arquivei.com.br
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>> Silício]
>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>> <https://www.facebook.com/arquivei>
>>>> <https://www.linkedin.com/company/arquivei>
>>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>>>
>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>>
>> JC
>>
>>
>
> --
> *André Badawi Missaglia*
> Data Engineer
> (16) 3509-5515 *|* www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>

Reply via email to