I have an unbounded kafka source that has records written to it every second. Instead of the job waiting to process the new messages it closes. How do I keep the stream open?
KafkaSource<FluentdMessage> dataSource = KafkaSource .<FluentdMessage>builder() .setBootstrapServers(kafkaServer) .setTopics(Arrays.asList("fluentd")) .setGroupId("") .setDeserializer(new FluentdRecordDeserializer()) //.setStartingOffsets(OffsetsInitializer.earliest()) //.setBounded(OffsetsInitializer.latest()) .setUnbounded(OffsetsInitializer.latest()) .build(); -- Robert Cullen 240-475-4490