Robert, here I modified your example with some highlights.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING); KafkaSource<String> source = KafkaSource .<String>builder() .setBootstrapServers("kafka-headless:9092") .setTopics(Arrays.asList("log-input")) .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) .*setBounded*(OffsetsInitializer.latest()) .build(); env.addSource(source); You can also explicitely set but that shouldn't be necessary (and may make things more complicated once you also want to execute the application in streaming mode). env.setRuntimeMode(RuntimeExecutionMode.BATCH); On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <cinquate...@gmail.com> wrote: > Arvid, > > Thanks, Can you show me an example of how the source is tied to the > ExecutionEnivornment. > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > > KafkaSource<String> source = KafkaSource > .<String>builder() > .setBootstrapServers("kafka-headless:9092") > .setTopics(Arrays.asList("log-input")) > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) > .setUnbounded(OffsetsInitializer.latest()) > .build(); > > env.addSource(source); > > > On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Robert, >> >> you basically just (re)write your application with DataStream API, use >> the new KafkaSource, and then let the automatic batch detection mode work >> [1]. >> The most important part is that all your sources need to be bounded. >> Assuming that you just have a Kafka source, you need to use setBounded >> with the appropriate end offset/timestamp. >> >> Note that the rewritten Kafka source still has a couple of issues that >> should be addressed by the first bugfix release of 1.12 in this month. So >> while it's safe to use for development, I'd wait for 1.12.1 to roll it out >> on production. >> >> If you have specific questions on the migration from DataSet and >> DataStream, please let me know. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html >> >> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com> >> wrote: >> >>> I have a Kafka source that I would like to run a batch job on. Since >>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the >>> DataStream API, can someone show me an example of this? (Using DataStream) >>> >>> thanks >>> -- >>> Robert Cullen >>> 240-475-4490 >>> >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > > > -- > Robert Cullen > 240-475-4490 > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng