Robert,

here I modified your example with some highlights.

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .*setBounded*(OffsetsInitializer.latest())
        .build();

env.addSource(source);

You can also explicitely set but that shouldn't be necessary (and may make
things more complicated once you also want to execute the application in
streaming mode).

env.setRuntimeMode(RuntimeExecutionMode.BATCH);


On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <cinquate...@gmail.com> wrote:

> Arvid,
>
> Thanks, Can you show me an example of how the source is tied to the
> ExecutionEnivornment.
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> KafkaSource<String> source = KafkaSource
>         .<String>builder()
>         .setBootstrapServers("kafka-headless:9092")
>         .setTopics(Arrays.asList("log-input"))
>         
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>         .setUnbounded(OffsetsInitializer.latest())
>         .build();
>
> env.addSource(source);
>
>
> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Robert,
>>
>> you basically just (re)write your application with DataStream API, use
>> the new KafkaSource, and then let the automatic batch detection mode work
>> [1].
>> The most important part is that all your sources need to be bounded.
>> Assuming that you just have a Kafka source, you need to use setBounded
>> with the appropriate end offset/timestamp.
>>
>> Note that the rewritten Kafka source still has a couple of issues that
>> should be addressed by the first bugfix release of 1.12 in this month. So
>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
>> on production.
>>
>> If you have specific questions on the migration from DataSet and
>> DataStream, please let me know.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>>
>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com>
>> wrote:
>>
>>> I have a Kafka source that I would like to run a batch job on.  Since
>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
>>> DataStream API, can someone show me an example of this? (Using DataStream)
>>>
>>> thanks
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
> Robert Cullen
> 240-475-4490
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to