Hi Jing,
Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
following this link
http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
. Please let me know if there is any other way of aggregating elements
locally.














*public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
  public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
bundleTrigger,                          KeySelector<TaxiFare, Long>
keySelector) {        super(userFunction, bundleTrigger, keySelector);
  this.keySelector = keySelector;    }    @Override    protected Long
getKey(TaxiFare input) throws Exception {        return
keySelector.getKey(input);    }}*

Thanks

On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Suman,
> Would you please provide the code about `*TaxiFareStream*`? It seems we
> could use `MapBundleOperator` directly here.
> BTW, I have some concerns about using the solution to do local-aggregation
> for window aggregation because `MapBundleOperator`
> would save input data in a bundle which is a HashMap object which could
> not keep the data input sequence. I'm afraid there exists
> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
> reasonable to assign a watermark based on an unordered
> timestamp.
>
> Best,
> JING ZHANG
>
>
>
> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>
>> I am trying to do pre shuffle aggregation in flink. Following is the
>> MapBundle implementation.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
>> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>   if (value == null) {            return input;        }        value.tip =
>> value.tip + input.tip;        return value;    }    @Override    public
>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
>> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
>> buffer.entrySet()) {            out.collect(entry.getValue());        }
>> }}*
>>
>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
>> not working as the "*count*" variable is always 0. Please let me know If
>> I am missing something.
>>
>>
>>
>>
>>
>>
>>
>>
>> *    @Override    public void onElement(T element) throws Exception {
>>     count++;        if (count >= maxCount) {
>> callback.finishBundle();            reset();        }    }*
>>
>> Here is the main code.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>> mapBundleFunction = new TaxiFareMapBundleFunction();
>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>> getKey(TaxiFare value) throws Exception {                return
>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>> fare) -> fare.driverId)//
>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>> AddTips());;                fares.transform("preshuffle",
>> TypeInformation.of(TaxiFare.class),                        new
>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>             ))                        .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>                       @Override                            public long
>> extractTimestamp(TaxiFare element) {                                return
>> element.startTime.getEpochSecond();                            }
>>             })                        .keyBy((TaxiFare fare) ->
>> fare.driverId)
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>> hourlyMax =
>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>
>> Thanks
>> Suman
>>
>

Reply via email to