Arvid, I’m hoping to get your input on a process I’m working on. Originally I was using a streaming solution but noticed that the data in the sliding windows was getting too large over longer intervals and sometimes stopped processing altogether. Anyway, the total counts should be a fixed number so a batch process would be more acceptable.
The use case is this: Get counts on keys for 30 minutes of data, take those totals and take a 30 second time slice on the same data, possibly consecutive time slices, take the results and run it through one function: Originally my code looked like this using Sliding Time Windows in streaming mode: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<FluentdMessage> stream = env .addSource(getConsumer(properties)) .name("Kafka Source"); DataStream<Tuple2<String, Long>> keyedCounts = stream .filter(value -> value.getGrokName() != null) .map(new MapFunction<FluentdMessage, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(FluentdMessage value) throws Exception { return Tuple2.of(value.getGrokName(), 1L); } }) .keyBy(value -> value.f0) .window(SlidingProcessingTimeWindows.of(Time.minutes(30), Time.seconds(30))) .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) //.sum(2); .reduce((ReduceFunction<Tuple2<String, Long>>) (data1, data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1)); keyedCounts .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(30), Time.seconds(30))) .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) .process(new ProcessAllWindowFunction<Tuple2<String, Long>, Tuple5<String, Long, Long, String, Long>, TimeWindow>() { private ValueState<Long> currentCount; @Override public void open(Configuration parameters) throws Exception { currentCount = getRuntimeContext().getState( new ValueStateDescriptor<>("count", Long.class)); } @Override public void process(Context context, Iterable<Tuple2<String, Long>> iterable, Collector<Tuple5<String, Long, Long, String, Long>> out) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); if(currentCount.value() == null) { currentCount.update(0L); } Iterator<Tuple2<String, Long>> iterator = iterable.iterator(); Map<String, Long> map = new HashMap<>(); Map<String, List<Long>> keyTotalMap = new HashMap<>(); if(currentCount.value() < count) { while (iterator.hasNext()) { Tuple2<String, Long> tuple = iterator.next(); map.put(tuple.f0, keyDifference(tuple.f0, iterable)); keyTotalMap.computeIfAbsent(tuple.f0, k -> new ArrayList<>()).add(tuple.f1); //out.collect(Tuple3.of(tuple.f0, keyDifference(tuple.f0, iterable), sum(iterable))); } map.forEach((key, value) -> { if(value > 0L) { out.collect(Tuple5.of( key, value, sum(key, keyTotalMap), getChiSqrLoggerScore(value, sumKeys(map), sum(key, keyTotalMap), sum(keyTotalMap)), System.currentTimeMillis())); }}); //out.collect(Tuple5.of(null, null, null, null, null)); currentCount.update(count); } else { //This is currently the only way to force the job to end throw new InterruptedException(); } } }) .addSink(new RichChiLoggerInputSink()) .name("Postgres Sink"); //globalCounts.writeAsText("s3://argo-workflow-bucket/output.txt"); env.execute("Flink Kafka Chi Log Runner"); This does not work in batch mode. So I need some guidance. Thanks! On Tue, Jan 5, 2021 at 11:29 AM Arvid Heise <ar...@ververica.com> wrote: > Sorry Robert for not checking the complete example. New sources are used > with fromSource instead of addSource. It's not ideal but hopefully we can > remove the old way rather soonish to avoid confusion. > > On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen <cinquate...@gmail.com> > wrote: > >> Arvid, >> >> Thank you. Sorry, my last post was not clear. This line: >> >> env.addSource(source) >> >> does not compile since addSource is expecting a SourceFunction not a >> KafkaSource type. >> >> On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <ar...@ververica.com> wrote: >> >>> Robert, >>> >>> here I modified your example with some highlights. >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>> >>> KafkaSource<String> source = KafkaSource >>> .<String>builder() >>> .setBootstrapServers("kafka-headless:9092") >>> .setTopics(Arrays.asList("log-input")) >>> >>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) >>> .*setBounded*(OffsetsInitializer.latest()) >>> .build(); >>> >>> env.addSource(source); >>> >>> You can also explicitely set but that shouldn't be necessary (and may >>> make things more complicated once you also want to execute the application >>> in streaming mode). >>> >>> env.setRuntimeMode(RuntimeExecutionMode.BATCH); >>> >>> >>> On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <cinquate...@gmail.com> >>> wrote: >>> >>>> Arvid, >>>> >>>> Thanks, Can you show me an example of how the source is tied to the >>>> ExecutionEnivornment. >>>> >>>> final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>>> >>>> KafkaSource<String> source = KafkaSource >>>> .<String>builder() >>>> .setBootstrapServers("kafka-headless:9092") >>>> .setTopics(Arrays.asList("log-input")) >>>> >>>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) >>>> .setUnbounded(OffsetsInitializer.latest()) >>>> .build(); >>>> >>>> env.addSource(source); >>>> >>>> >>>> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote: >>>> >>>>> Hi Robert, >>>>> >>>>> you basically just (re)write your application with DataStream API, use >>>>> the new KafkaSource, and then let the automatic batch detection mode work >>>>> [1]. >>>>> The most important part is that all your sources need to be bounded. >>>>> Assuming that you just have a Kafka source, you need to use setBounded >>>>> with the appropriate end offset/timestamp. >>>>> >>>>> Note that the rewritten Kafka source still has a couple of issues that >>>>> should be addressed by the first bugfix release of 1.12 in this month. So >>>>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out >>>>> on production. >>>>> >>>>> If you have specific questions on the migration from DataSet and >>>>> DataStream, please let me know. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html >>>>> >>>>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com> >>>>> wrote: >>>>> >>>>>> I have a Kafka source that I would like to run a batch job on. Since >>>>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the >>>>>> DataStream API, can someone show me an example of this? (Using >>>>>> DataStream) >>>>>> >>>>>> thanks >>>>>> -- >>>>>> Robert Cullen >>>>>> 240-475-4490 >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>>> >>>> -- >>>> Robert Cullen >>>> 240-475-4490 >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> Robert Cullen >> 240-475-4490 >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Robert Cullen 240-475-4490