Hi JING,
Thanks for the pointers.

1) I am able to debug why the variable `*numOfElements` *was getting reset
to 0.  The following method of* AbstarctMapBundleOperator.java *was getting
called which was resetting the variable to 0 before it could reach max
count.









*    @Override    public void finishBundle() throws Exception {        if
(!bundle.isEmpty()) {            numOfElements = 0;
function.finishBundle(bundle, collector);            bundle.clear();
}        bundleTrigger.reset();    }*

2) To avoid this problem I created a simple aggregator which will
accumulate the elements in a LinkedHashMap and output them when it reaches
a max count. I can see now the bundle size is reaching the max and
*output.collect
*is getting called. But I still don't see output. Here is the new
aggregator code.




































*public class MyAggregator<K, V, IN, OUT> extends
AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
private int count;    private Map<K, OUT> bundle = new LinkedHashMap<>();
  private MapBundleFunction<K, OUT, IN, OUT> bundleFunction;    private
KeySelector<IN, K> keySelector;    public MyAggregator(int count,
MapBundleFunction<K, OUT, IN, OUT> bundleFunction, KeySelector<IN, K>
keySelector) {        this.count = count;        this.bundleFunction =
bundleFunction;        this.keySelector = keySelector;    }    @Override
public void open() throws Exception {        bundle = new
LinkedHashMap<>();    }    @Override    public void
processElement(StreamRecord<IN> element) throws Exception {        K key =
getKey(element);        OUT value = bundle.get(key);        OUT newValue =
bundleFunction.addInput(value, element.getValue());        bundle.put(key,
newValue);        if (bundle.size() > count) {            for (Map.Entry<K,
OUT> entry :bundle.entrySet()) {                output.collect(new
StreamRecord<>(entry.getValue()));            }            bundle.clear();
      }    }    private K getKey(StreamRecord<IN> element) throws Exception
{        return keySelector.getKey(element.getValue());    }}*

Following is the drive code






























































*public class HourlyTipsSolution extends ExerciseBase {    /**     * Main
method.     *     * @throws Exception which occurs during job execution.
 */    public static void main(String[] args) throws Exception {        //
set up streaming execution environment        StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(ExerciseBase.parallelism);        // start the data
generator        DataStream<TaxiFare> fares =
env.addSource(fareSourceOrTest(new TaxiFareGenerator()));        // compute
tips per hour for each driver        MapBundleFunction<Long, TaxiFare,
TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
    BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
      KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
KeySelector<TaxiFare, Long>() {            @Override            public Long
getKey(TaxiFare value) throws Exception {                return
value.driverId;            }        };        MyAggregator<Long, TaxiFare,
TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10,
mapBundleFunction, taxiFareLongKeySelector);        DataStream<Tuple3<Long,
Long, Float>> hourlyTips =                fares.transform("preshuffle",
TypeInformation.of(TaxiFare.class), aggregator)
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.hours(1)))
    .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
      printOrTest(hourlyMax);        // execute the transformation
pipeline        env.execute("Hourly Tips (java)");    }    /*     * Wraps
the pre-aggregated result into a tuple along with the window's timestamp
and key.     */    public static class AddTips            extends
ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long,
TimeWindow> {        @Override        public void process(
Long key,                Context context,                Iterable<TaxiFare>
fares,                Collector<Tuple3<Long, Long, Float>> out) {
  float sumOfTips = 0F;            for (TaxiFare f : fares) {
  sumOfTips += f.tip;            }
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}    }}*

But the following driver code works when I remove the aggregator


*public class HourlyTipsSolution extends ExerciseBase {*


























































*   /**     * Main method.     *     * @throws Exception which occurs
during job execution.     */    public static void main(String[] args)
throws Exception {        // set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(ExerciseBase.parallelism);        // start the data
generator        DataStream<TaxiFare> fares =
env.addSource(fareSourceOrTest(new TaxiFareGenerator()));        // compute
tips per hour for each driver        MapBundleFunction<Long, TaxiFare,
TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
    BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(2);
    KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
KeySelector<TaxiFare, Long>() {            @Override            public Long
getKey(TaxiFare value) throws Exception {                return
value.driverId;            }        };        MyAggregator<Long, TaxiFare,
TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10,
mapBundleFunction, taxiFareLongKeySelector);        DataStream<Tuple3<Long,
Long, Float>> hourlyTips =                            fares.keyBy((TaxiFare
fare) -> fare.driverId)
 .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
AddTips());;        DataStream<Tuple3<Long, Long, Float>> hourlyMax =

hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).maxBy(2);
      printOrTest(hourlyMax);        // execute the transformation
pipeline        env.execute("Hourly Tips (java)");    }    /*     * Wraps
the pre-aggregated result into a tuple along with the window's timestamp
and key.     */    public static class AddTips            extends
ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long,
TimeWindow> {        @Override        public void process(
Long key,                Context context,                Iterable<TaxiFare>
fares,                Collector<Tuple3<Long, Long, Float>> out) {
  float sumOfTips = 0F;            for (TaxiFare f : fares) {
  sumOfTips += f.tip;            }
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}    }}*

On Fri, Aug 20, 2021 at 4:18 AM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Suman,
> > But I am always seeing the following code of `
> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0.
> It is weird, please set a breakpoint at line `
> *bundleTrigger.onElement(input);*`  in `*processElement*` method to see
> what happens when a record is processed by `*processElement*`.
>
> > One more question, you mentioned that I need to test with `
> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this change?
> You could copy the class  `AbstractMapBundleOperator`, and update the
> bundle initialization code in the `open` method.
> Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are
> not marked  as @public, they have no guarantee of compatibility.
> You'd better copy them for your own use.
>
> Best,
> JING ZHANG
>
> suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 下午2:18写道:
>
>> Hi Jing,
>> I tried using `*MapBundleOperator*` also (I am yet to test with
>> LinkedHashMap) . But I am always seeing that the following code of `
>> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
>> never getting incremented. I replaced `*TaxiFareStream*` with `
>> *MapBundleOperator*` in the above code. It should increment by 1
>> each time an element is processed but that is not happening.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *    public void processElement(StreamRecord<IN> element) throws
>> Exception {        // get the key and value for the map bundle        final
>> IN input = element.getValue();        final K bundleKey = getKey(input);
>>     final V bundleValue = bundle.get(bundleKey);        // get a new value
>> after adding this element to bundle        final V newBundleValue =
>> function.addInput(bundleValue, input);        // update to map bundle
>>   bundle.put(bundleKey, newBundleValue);        numOfElements++;
>> bundleTrigger.onElement(input);    }*
>>
>> One more question, you mentioned that I need to test with `
>> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this
>> change? Do I need to create a class which extends from `MapBundleOperator`
>> and add it there?
>>
>> Thanks
>>
>>
>> On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <beyond1...@gmail.com> wrote:
>>
>>> Hi Suman,
>>> Please try copy `*MapBundleOperator*`, update the `HashMap` to
>>> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道:
>>>
>>>> Hi Jing,
>>>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I
>>>> was following this link
>>>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>>>> . Please let me know if there is any other way of aggregating elements
>>>> locally.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
>>>> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>>>>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
>>>> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
>>>> bundleTrigger,                          KeySelector<TaxiFare, Long>
>>>> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>>>>   this.keySelector = keySelector;    }    @Override    protected Long
>>>> getKey(TaxiFare input) throws Exception {        return
>>>> keySelector.getKey(input);    }}*
>>>>
>>>> Thanks
>>>>
>>>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Suman,
>>>>> Would you please provide the code about `*TaxiFareStream*`? It seems
>>>>> we could use `MapBundleOperator` directly here.
>>>>> BTW, I have some concerns about using the solution to do
>>>>> local-aggregation for window aggregation because `MapBundleOperator`
>>>>> would save input data in a bundle which is a HashMap object which
>>>>> could not keep the data input sequence. I'm afraid there exists
>>>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>>>> is reasonable to assign a watermark based on an unordered
>>>>> timestamp.
>>>>>
>>>>> Best,
>>>>> JING ZHANG
>>>>>
>>>>>
>>>>>
>>>>> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>>>>
>>>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>>>> MapBundle implementation.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *public class TaxiFareMapBundleFunction extends
>>>>>> MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {    @Override
>>>>>> public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws
>>>>>> Exception {        if (value == null) {            return input;        }
>>>>>>       value.tip = value.tip + input.tip;        return value;    }
>>>>>> @Override    public void finishBundle(Map<Long, TaxiFare> buffer,
>>>>>> Collector<TaxiFare> out) throws Exception {        for (Map.Entry<Long,
>>>>>> TaxiFare> entry : buffer.entrySet()) {
>>>>>> out.collect(entry.getValue());        }    }}*
>>>>>>
>>>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle
>>>>>> aggregation is not working as the "*count*" variable is always 0.
>>>>>> Please let me know If I am missing something.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *    @Override    public void onElement(T element) throws Exception
>>>>>> {        count++;        if (count >= maxCount) {
>>>>>> callback.finishBundle();            reset();        }    }*
>>>>>>
>>>>>> Here is the main code.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>>>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>>>>> KeySelector<TaxiFare, Long>() {            @Override            public 
>>>>>> Long
>>>>>> getKey(TaxiFare value) throws Exception {                return
>>>>>> value.driverId;            }        };        DataStream<Tuple3<Long, 
>>>>>> Long,
>>>>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>>>>> fare) -> fare.driverId)//
>>>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>>>>> AddTips());;                fares.transform("preshuffle",
>>>>>> TypeInformation.of(TaxiFare.class),                        new
>>>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>>>>             ))                        .assignTimestampsAndWatermarks(new
>>>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>>>>                       @Override                            public long
>>>>>> extractTimestamp(TaxiFare element) {                                
>>>>>> return
>>>>>> element.startTime.getEpochSecond();                            }
>>>>>>             })                        .keyBy((TaxiFare fare) ->
>>>>>> fare.driverId)
>>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, 
>>>>>> Float>>
>>>>>> hourlyMax =
>>>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>>>>
>>>>>> Thanks
>>>>>> Suman
>>>>>>
>>>>>

Reply via email to