Hi,

What happens if you do not set any boundedness on the KafkaSource?
For a DataStream job in streaming mode, the Kafka source should be
unbounded.

>From reading the code, it seems that setting unbounded(latest) should not
trigger the behavior you mention ... but the Flink docs are not clearly
written [1], as it says that you can make a Kafka source bounded by calling
"setUnbounded" ... which is weird, because "setUnbounded" should not make
something bounded?!

Are there any log messages from the Source that can give us any hints?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness

On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen <cinquate...@gmail.com> wrote:

> I have an unbounded kafka source that has records written to it every
> second.  Instead of the job waiting to process the new messages it closes.
> How do I keep the stream open?
>
> KafkaSource<FluentdMessage> dataSource = KafkaSource
>         .<FluentdMessage>builder()
>         .setBootstrapServers(kafkaServer)
>         .setTopics(Arrays.asList("fluentd"))
>         .setGroupId("")
>         .setDeserializer(new FluentdRecordDeserializer())
>         //.setStartingOffsets(OffsetsInitializer.earliest())
>         //.setBounded(OffsetsInitializer.latest())
>         .setUnbounded(OffsetsInitializer.latest())
>         .build();
>
>
>
>
> --
> Robert Cullen
> 240-475-4490
>

Reply via email to