Re: Pre shuffle aggregation in flink is not working

2021-08-21 Thread suman shil
apBundleOperator`, and update the > bundle initialization code in the `open` method. > Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are > not marked as @public, they have no guarantee of compatibility. > You'd better copy them for your own use. > &

Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
:58 PM JING ZHANG wrote: > Hi Suman, > Please try copy `*MapBundleOperator*`, update the `HashMap` to > `LinkedHashMap` to keep the output sequence consistent with input sequence. > > Best, > JING ZHANG > > suman shil 于2021年8月20日周五 上午2:23写道: > >> Hi Jing, >>

Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
e data input sequence. I'm afraid there exists > unorder in a bundle (in your case 10) problem. I'm not sure whether it is > reasonable to assign a watermark based on an unordered > timestamp. > > Best, > JING ZHANG > > > > suman shil 于2021年8月19日周四 下午12:43写道: >

Pre shuffle aggregation in flink is not working

2021-08-18 Thread suman shil
I am trying to do pre shuffle aggregation in flink. Following is the MapBundle implementation. *public class TaxiFareMapBundleFunction extends MapBundleFunction {@Overridepublic TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { if (value ==

Flink application question

2021-08-09 Thread suman shil
I am writing a Flink application which consumes time series data from kafka topic. Time series data has components like metric name, tag key value pair, timestamp and a value. I have created a tumbling window to aggregate data based on a metric key (which is a combination of metric name, key value

Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties

2021-08-04 Thread suman shil
In my flink streaming application I have kafka datasource. I am using the kafka property auto.offset.reset=latest. I am wondering if I need to use FlinkKafkaConsumer.setStartFromLatest(). Are they similar? Can I use either of them? Following is the documentation from flink code. /** * Specifies th