Hi,

If I remember right, this is actually the intended behaviour:

In batch mode: .setBounded(…)
In streaming mode: source that finishes anyway at set offset: use 
.setUnbounded(…)
In streaming mode: source that never finishes: don’t set a final offset (don’t 
.setUnbounded(…))

I might be mistaken …

Thias


From: Robert Metzger <rmetz...@apache.org>
Sent: Mittwoch, 22. September 2021 17:51
To: Robert Cullen <cinquate...@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Unbounded Kafka Source

Hi,

What happens if you do not set any boundedness on the KafkaSource?
For a DataStream job in streaming mode, the Kafka source should be unbounded.

From reading the code, it seems that setting unbounded(latest) should not 
trigger the behavior you mention ... but the Flink docs are not clearly written 
[1], as it says that you can make a Kafka source bounded by calling 
"setUnbounded" ... which is weird, because "setUnbounded" should not make 
something bounded?!

Are there any log messages from the Source that can give us any hints?

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness

On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen 
<cinquate...@gmail.com<mailto:cinquate...@gmail.com>> wrote:
I have an unbounded kafka source that has records written to it every second.  
Instead of the job waiting to process the new messages it closes.  How do I 
keep the stream open?


KafkaSource<FluentdMessage> dataSource = KafkaSource
        .<FluentdMessage>builder()
        .setBootstrapServers(kafkaServer)
        .setTopics(Arrays.asList("fluentd"))
        .setGroupId("")
        .setDeserializer(new FluentdRecordDeserializer())
        //.setStartingOffsets(OffsetsInitializer.earliest())
        //.setBounded(OffsetsInitializer.latest())
        .setUnbounded(OffsetsInitializer.latest())
        .build();



--
Robert Cullen
240-475-4490
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to