Arvid,

I’m hoping to get your input on a process I’m working on. Originally I was
using a streaming solution but noticed that the data in the sliding windows
was getting too large over longer intervals and sometimes stopped
processing altogether. Anyway, the total counts should be a fixed number so
a batch process would be more acceptable.

The use case is this: Get counts on keys for 30 minutes of data, take those
totals and take a 30 second time slice on the same data, possibly
consecutive time slices, take the results and run it through one function:
Originally my code looked like this using Sliding Time Windows in streaming
mode:

 final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<FluentdMessage> stream = env
                .addSource(getConsumer(properties))
                .name("Kafka Source");

        DataStream<Tuple2<String, Long>> keyedCounts  = stream
                .filter(value -> value.getGrokName() != null)
                .map(new MapFunction<FluentdMessage, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(FluentdMessage
value) throws Exception {
                        return Tuple2.of(value.getGrokName(), 1L);
                    }
                })
                .keyBy(value -> value.f0)

.window(SlidingProcessingTimeWindows.of(Time.minutes(30),
Time.seconds(30)))
                .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
                //.sum(2);
                .reduce((ReduceFunction<Tuple2<String, Long>>) (data1,
data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));

       keyedCounts

.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(30),
Time.seconds(30)))
                .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
                .process(new ProcessAllWindowFunction<Tuple2<String,
Long>, Tuple5<String, Long, Long, String, Long>, TimeWindow>() {

                    private ValueState<Long> currentCount;

                    @Override
                    public void open(Configuration parameters) throws
Exception {
                        currentCount = getRuntimeContext().getState(
                                new ValueStateDescriptor<>("count",
Long.class));
                    }

                    @Override
                    public void process(Context context,
                                        Iterable<Tuple2<String, Long>> iterable,
                                        Collector<Tuple5<String, Long,
Long, String, Long>> out) throws Exception {
                        long count =
StreamSupport.stream(iterable.spliterator(), false).count();
                        if(currentCount.value() == null) {
                            currentCount.update(0L);
                        }
                        Iterator<Tuple2<String, Long>> iterator =
iterable.iterator();
                        Map<String, Long> map = new HashMap<>();
                        Map<String, List<Long>> keyTotalMap = new HashMap<>();

                        if(currentCount.value() < count) {
                            while (iterator.hasNext()) {
                                Tuple2<String, Long> tuple = iterator.next();
                                map.put(tuple.f0,
keyDifference(tuple.f0, iterable));
                                keyTotalMap.computeIfAbsent(tuple.f0,
k -> new ArrayList<>()).add(tuple.f1);
                                //out.collect(Tuple3.of(tuple.f0,
keyDifference(tuple.f0, iterable), sum(iterable)));
                            }

                            map.forEach((key, value) -> {
                                if(value > 0L) {
                                    out.collect(Tuple5.of(
                                            key,
                                            value,
                                            sum(key, keyTotalMap),

getChiSqrLoggerScore(value, sumKeys(map), sum(key, keyTotalMap),
sum(keyTotalMap)),
                                            System.currentTimeMillis()));
                                }});

                            //out.collect(Tuple5.of(null, null, null,
null, null));
                            currentCount.update(count);
                        } else {
                            //This is currently the only way to force
the job to end
                            throw new InterruptedException();
                        }
                    }
                })
               .addSink(new RichChiLoggerInputSink())
               .name("Postgres Sink");

        //globalCounts.writeAsText("s3://argo-workflow-bucket/output.txt");
        env.execute("Flink Kafka Chi Log Runner");

This does not work in batch mode. So I need some guidance. Thanks!

On Tue, Jan 5, 2021 at 11:29 AM Arvid Heise <ar...@ververica.com> wrote:

> Sorry Robert for not checking the complete example. New sources are used
> with fromSource instead of addSource. It's not ideal but hopefully we can
> remove the old way rather soonish to avoid confusion.
>
> On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen <cinquate...@gmail.com>
> wrote:
>
>> Arvid,
>>
>> Thank you. Sorry, my last post was not clear. This line:
>>
>> env.addSource(source)
>>
>> does not compile since addSource is expecting a SourceFunction not a
>> KafkaSource type.
>>
>> On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Robert,
>>>
>>> here I modified your example with some highlights.
>>>
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>> KafkaSource<String> source = KafkaSource
>>>         .<String>builder()
>>>         .setBootstrapServers("kafka-headless:9092")
>>>         .setTopics(Arrays.asList("log-input"))
>>>         
>>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>>         .*setBounded*(OffsetsInitializer.latest())
>>>         .build();
>>>
>>> env.addSource(source);
>>>
>>> You can also explicitely set but that shouldn't be necessary (and may
>>> make things more complicated once you also want to execute the application
>>> in streaming mode).
>>>
>>> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>>>
>>>
>>> On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <cinquate...@gmail.com>
>>> wrote:
>>>
>>>> Arvid,
>>>>
>>>> Thanks, Can you show me an example of how the source is tied to the
>>>> ExecutionEnivornment.
>>>>
>>>> final StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>>
>>>> KafkaSource<String> source = KafkaSource
>>>>         .<String>builder()
>>>>         .setBootstrapServers("kafka-headless:9092")
>>>>         .setTopics(Arrays.asList("log-input"))
>>>>         
>>>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>>>         .setUnbounded(OffsetsInitializer.latest())
>>>>         .build();
>>>>
>>>> env.addSource(source);
>>>>
>>>>
>>>> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> you basically just (re)write your application with DataStream API, use
>>>>> the new KafkaSource, and then let the automatic batch detection mode work
>>>>> [1].
>>>>> The most important part is that all your sources need to be bounded.
>>>>> Assuming that you just have a Kafka source, you need to use setBounded
>>>>> with the appropriate end offset/timestamp.
>>>>>
>>>>> Note that the rewritten Kafka source still has a couple of issues that
>>>>> should be addressed by the first bugfix release of 1.12 in this month. So
>>>>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
>>>>> on production.
>>>>>
>>>>> If you have specific questions on the migration from DataSet and
>>>>> DataStream, please let me know.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>>>>>
>>>>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have a Kafka source that I would like to run a batch job on.  Since
>>>>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
>>>>>> DataStream API, can someone show me an example of this? (Using 
>>>>>> DataStream)
>>>>>>
>>>>>> thanks
>>>>>> --
>>>>>> Robert Cullen
>>>>>> 240-475-4490
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Robert Cullen
240-475-4490

Reply via email to