Arvid, Thank you. Sorry, my last post was not clear. This line:
env.addSource(source) does not compile since addSource is expecting a SourceFunction not a KafkaSource type. On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <ar...@ververica.com> wrote: > Robert, > > here I modified your example with some highlights. > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > > KafkaSource<String> source = KafkaSource > .<String>builder() > .setBootstrapServers("kafka-headless:9092") > .setTopics(Arrays.asList("log-input")) > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) > .*setBounded*(OffsetsInitializer.latest()) > .build(); > > env.addSource(source); > > You can also explicitely set but that shouldn't be necessary (and may make > things more complicated once you also want to execute the application in > streaming mode). > > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > > > On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <cinquate...@gmail.com> > wrote: > >> Arvid, >> >> Thanks, Can you show me an example of how the source is tied to the >> ExecutionEnivornment. >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >> >> KafkaSource<String> source = KafkaSource >> .<String>builder() >> .setBootstrapServers("kafka-headless:9092") >> .setTopics(Arrays.asList("log-input")) >> >> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) >> .setUnbounded(OffsetsInitializer.latest()) >> .build(); >> >> env.addSource(source); >> >> >> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Robert, >>> >>> you basically just (re)write your application with DataStream API, use >>> the new KafkaSource, and then let the automatic batch detection mode work >>> [1]. >>> The most important part is that all your sources need to be bounded. >>> Assuming that you just have a Kafka source, you need to use setBounded >>> with the appropriate end offset/timestamp. >>> >>> Note that the rewritten Kafka source still has a couple of issues that >>> should be addressed by the first bugfix release of 1.12 in this month. So >>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out >>> on production. >>> >>> If you have specific questions on the migration from DataSet and >>> DataStream, please let me know. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html >>> >>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com> >>> wrote: >>> >>>> I have a Kafka source that I would like to run a batch job on. Since >>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the >>>> DataStream API, can someone show me an example of this? (Using DataStream) >>>> >>>> thanks >>>> -- >>>> Robert Cullen >>>> 240-475-4490 >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> Robert Cullen >> 240-475-4490 >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Robert Cullen 240-475-4490