Hi, What happens if you do not set any boundedness on the KafkaSource? For a DataStream job in streaming mode, the Kafka source should be unbounded.
>From reading the code, it seems that setting unbounded(latest) should not trigger the behavior you mention ... but the Flink docs are not clearly written [1], as it says that you can make a Kafka source bounded by calling "setUnbounded" ... which is weird, because "setUnbounded" should not make something bounded?! Are there any log messages from the Source that can give us any hints? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen <cinquate...@gmail.com> wrote: > I have an unbounded kafka source that has records written to it every > second. Instead of the job waiting to process the new messages it closes. > How do I keep the stream open? > > KafkaSource<FluentdMessage> dataSource = KafkaSource > .<FluentdMessage>builder() > .setBootstrapServers(kafkaServer) > .setTopics(Arrays.asList("fluentd")) > .setGroupId("") > .setDeserializer(new FluentdRecordDeserializer()) > //.setStartingOffsets(OffsetsInitializer.earliest()) > //.setBounded(OffsetsInitializer.latest()) > .setUnbounded(OffsetsInitializer.latest()) > .build(); > > > > > -- > Robert Cullen > 240-475-4490 >