Arvid,

Thank you. Sorry, my last post was not clear. This line:

env.addSource(source)

does not compile since addSource is expecting a SourceFunction not a
KafkaSource type.

On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <ar...@ververica.com> wrote:

> Robert,
>
> here I modified your example with some highlights.
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> KafkaSource<String> source = KafkaSource
>         .<String>builder()
>         .setBootstrapServers("kafka-headless:9092")
>         .setTopics(Arrays.asList("log-input"))
>         
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>         .*setBounded*(OffsetsInitializer.latest())
>         .build();
>
> env.addSource(source);
>
> You can also explicitely set but that shouldn't be necessary (and may make
> things more complicated once you also want to execute the application in
> streaming mode).
>
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>
> On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <cinquate...@gmail.com>
> wrote:
>
>> Arvid,
>>
>> Thanks, Can you show me an example of how the source is tied to the
>> ExecutionEnivornment.
>>
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>> KafkaSource<String> source = KafkaSource
>>         .<String>builder()
>>         .setBootstrapServers("kafka-headless:9092")
>>         .setTopics(Arrays.asList("log-input"))
>>         
>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>         .setUnbounded(OffsetsInitializer.latest())
>>         .build();
>>
>> env.addSource(source);
>>
>>
>> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Robert,
>>>
>>> you basically just (re)write your application with DataStream API, use
>>> the new KafkaSource, and then let the automatic batch detection mode work
>>> [1].
>>> The most important part is that all your sources need to be bounded.
>>> Assuming that you just have a Kafka source, you need to use setBounded
>>> with the appropriate end offset/timestamp.
>>>
>>> Note that the rewritten Kafka source still has a couple of issues that
>>> should be addressed by the first bugfix release of 1.12 in this month. So
>>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
>>> on production.
>>>
>>> If you have specific questions on the migration from DataSet and
>>> DataStream, please let me know.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>>>
>>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com>
>>> wrote:
>>>
>>>> I have a Kafka source that I would like to run a batch job on.  Since
>>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
>>>> DataStream API, can someone show me an example of this? (Using DataStream)
>>>>
>>>> thanks
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Robert Cullen
240-475-4490

Reply via email to