Re: Batch with Flink Steraming API version 1.12.0

2021-01-06 Thread Arvid Heise
Hi Robert, The most reliable way to use batch mode in streaming is to use event time [1]. Processing time windows or ingestion time does not make a whole lot of sense if you want to do some kind of reprocessing (indeterministic results and resource usage because the timestamp of records change wit

Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Robert Cullen
Arvid, I’m hoping to get your input on a process I’m working on. Originally I was using a streaming solution but noticed that the data in the sliding windows was getting too large over longer intervals and sometimes stopped processing altogether. Anyway, the total counts should be a fixed number s

Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Arvid Heise
Sorry Robert for not checking the complete example. New sources are used with fromSource instead of addSource. It's not ideal but hopefully we can remove the old way rather soonish to avoid confusion. On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen wrote: > Arvid, > > Thank you. Sorry, my last post

Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Robert Cullen
Arvid, Thank you. Sorry, my last post was not clear. This line: env.addSource(source) does not compile since addSource is expecting a SourceFunction not a KafkaSource type. On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise wrote: > Robert, > > here I modified your example with some highlights. > >

Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Arvid Heise
Robert, here I modified your example with some highlights. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING); KafkaSource source = KafkaSource .builder() .setBootstrapServers("kafka-headl

Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Robert Cullen
Arvid, Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); KafkaSource source = KafkaSource .builder()

Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Arvid Heise
Hi Robert, you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1]. The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBound