Sorry Robert for not checking the complete example. New sources are used with fromSource instead of addSource. It's not ideal but hopefully we can remove the old way rather soonish to avoid confusion.
On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen <cinquate...@gmail.com> wrote: > Arvid, > > Thank you. Sorry, my last post was not clear. This line: > > env.addSource(source) > > does not compile since addSource is expecting a SourceFunction not a > KafkaSource type. > > On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <ar...@ververica.com> wrote: > >> Robert, >> >> here I modified your example with some highlights. >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >> >> KafkaSource<String> source = KafkaSource >> .<String>builder() >> .setBootstrapServers("kafka-headless:9092") >> .setTopics(Arrays.asList("log-input")) >> >> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) >> .*setBounded*(OffsetsInitializer.latest()) >> .build(); >> >> env.addSource(source); >> >> You can also explicitely set but that shouldn't be necessary (and may >> make things more complicated once you also want to execute the application >> in streaming mode). >> >> env.setRuntimeMode(RuntimeExecutionMode.BATCH); >> >> >> On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <cinquate...@gmail.com> >> wrote: >> >>> Arvid, >>> >>> Thanks, Can you show me an example of how the source is tied to the >>> ExecutionEnivornment. >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>> >>> KafkaSource<String> source = KafkaSource >>> .<String>builder() >>> .setBootstrapServers("kafka-headless:9092") >>> .setTopics(Arrays.asList("log-input")) >>> >>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) >>> .setUnbounded(OffsetsInitializer.latest()) >>> .build(); >>> >>> env.addSource(source); >>> >>> >>> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote: >>> >>>> Hi Robert, >>>> >>>> you basically just (re)write your application with DataStream API, use >>>> the new KafkaSource, and then let the automatic batch detection mode work >>>> [1]. >>>> The most important part is that all your sources need to be bounded. >>>> Assuming that you just have a Kafka source, you need to use setBounded >>>> with the appropriate end offset/timestamp. >>>> >>>> Note that the rewritten Kafka source still has a couple of issues that >>>> should be addressed by the first bugfix release of 1.12 in this month. So >>>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out >>>> on production. >>>> >>>> If you have specific questions on the migration from DataSet and >>>> DataStream, please let me know. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html >>>> >>>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com> >>>> wrote: >>>> >>>>> I have a Kafka source that I would like to run a batch job on. Since >>>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the >>>>> DataStream API, can someone show me an example of this? (Using DataStream) >>>>> >>>>> thanks >>>>> -- >>>>> Robert Cullen >>>>> 240-475-4490 >>>>> >>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >>> >>> -- >>> Robert Cullen >>> 240-475-4490 >>> >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > > > -- > Robert Cullen > 240-475-4490 > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng