Robert, So removing the setUnbounded(OffsetInitializer.latest) fixed the issue. Thanks!
On Wed, Sep 22, 2021 at 11:51 AM Robert Metzger <rmetz...@apache.org> wrote: > Hi, > > What happens if you do not set any boundedness on the KafkaSource? > For a DataStream job in streaming mode, the Kafka source should be > unbounded. > > From reading the code, it seems that setting unbounded(latest) should not > trigger the behavior you mention ... but the Flink docs are not clearly > written [1], as it says that you can make a Kafka source bounded by calling > "setUnbounded" ... which is weird, because "setUnbounded" should not make > something bounded?! > > Are there any log messages from the Source that can give us any hints? > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness > > On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen <cinquate...@gmail.com> > wrote: > >> I have an unbounded kafka source that has records written to it every >> second. Instead of the job waiting to process the new messages it closes. >> How do I keep the stream open? >> >> KafkaSource<FluentdMessage> dataSource = KafkaSource >> .<FluentdMessage>builder() >> .setBootstrapServers(kafkaServer) >> .setTopics(Arrays.asList("fluentd")) >> .setGroupId("") >> .setDeserializer(new FluentdRecordDeserializer()) >> //.setStartingOffsets(OffsetsInitializer.earliest()) >> //.setBounded(OffsetsInitializer.latest()) >> .setUnbounded(OffsetsInitializer.latest()) >> .build(); >> >> >> >> >> -- >> Robert Cullen >> 240-475-4490 >> > -- Robert Cullen 240-475-4490