Hi,
there is also PurgingTrigger, which turns any Trigger into a trigger that also 
purges when firing. Use it like this:

.trigger(PurgingTrigger.of(CountTrigger.of(5)))

Cheers,
Aljoscha
> On 08 Mar 2016, at 17:23, Marcela Charfuelan <[email protected]> 
> wrote:
> 
> Thanks Jun,
> Very useful, I was confusing the parameters because my input is tuples, which 
> I might not need in the end...
> 
> I have now what I wanted (non-parallel and not so efficient I guess, any 
> suggestion to improve is welcome) and I have to modify the trigger so to 
> FIRE_AND_PURGE when it reaches N, the max number of items per window, 
> otherwise it will count the whole data every time...
> 
> So my example looks like this now:
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);        
> DataStream<String> transactions = env.fromElements(
>                       "1 2 4 3 4",
>                       "3 4 5 4 6",
>                       "7 3 3 6 1",
>                       "1 3 2 4 6"                     
> );    
> DataStream<Hashtable<String, Integer>> counts = transactions
>               .flatMap(new LineSplitter())  // because I am expecting one 
> transaction per line
>               .windowAll(GlobalWindows.create())
>               .trigger(MyCountTrigger.of(5))
>               .apply(new MyWindowFunction());
> 
> counts.print();       
> env.execute("ItemsCount");
> 
> 
> public static class MyWindowFunction implements 
> AllWindowFunction<Tuple2<String,Integer>, Hashtable<String, Integer>, 
> GlobalWindow> {         
>               public Hashtable<String, Integer> itemsMap = new 
> Hashtable<String, Integer>();
>               
>       @Override
>           public void apply (GlobalWindow window,
>                                     Iterable<Tuple2<String,Integer>> tuples,
>                                     Collector<Hashtable<String, Integer>> 
> out) throws Exception {
>                for(Tuple2<String,Integer> tuple : tuples){
>                 if(itemsMap.containsKey(tuple.f0)){
>                        itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1);
>                 } else {
>                        itemsMap.put(tuple.f0,1);
>                 }
>             }
>             out.collect(itemsMap);
>                }
>               }
> 
> Regards,
> Marcela.
> 
> 
> 
> On 08.03.2016 09:34, Wang Yangjun wrote:
>> Hello Marcela,
>> 
>> I am not sure what is the “parameters mismatch” here. From the example you 
>> shown, it seems that you just want do a window word count. Right?
>> 
>> Could you try this code and is it want you want?
>> 
>> Best,
>> Jun
>> -------------------------------------------------
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>> Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 
>> 1, 3, 2, 4, 6};
>> List<Integer> list = Arrays.asList(array);
>> DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
>>         .windowAll(GlobalWindows.create())
>>         .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, 
>> Tuple2<Integer, Integer>, GlobalWindow>() {
>>             @Override
>>             public void apply(GlobalWindow window, Iterable<Integer> tuples, 
>> Collector<Tuple2<Integer, Integer>> out) throws Exception {
>>                 HashMap<Integer, Integer> map = new HashMap<>();
>>                 for(Integer tuple : tuples){
>>                     Integer value = 0;
>>                     if(map.containsKey(tuple)){
>>                         value = map.get(tuple);
>>                     }
>>                     map.put(tuple, value+1);
>>                 }
>> 
>>                 for(Map.Entry<Integer, Integer> entry : map.entrySet()) {
>>                     out.collect(new Tuple2<>(entry.getKey(), 
>> entry.getValue()));
>>                 }
>>             }
>>         });
>> 
>> counts.print();
>> 
>> env.execute("Stream WordCount");
>> 
>> 
>> 
>> 
>> 
>> On 08/03/16 02:57, "Marcela Charfuelan" <[email protected]> wrote:
>> 
>>> hello,
>>> 
>>> I want to make a function for counting items (per type) in windows of
>>> size N; For example for N=5 and the stream:
>>> 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6
>>> 
>>> I would like to generate the tuples:
>>> w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
>>> w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
>>> w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
>>> w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)
>>> 
>>> I am trying to apply my own function with "Window apply", something like:
>>> 
>>> items
>>> .windowAll(GlobalWindows.create())
>>> .trigger(CountTrigger.of(5))
>>> .apply(new MyWindowfunction())
>>> 
>>> but in this case there is a parameters mismatch with apply and
>>> WindowFunction, so I am not sure if it is not possible here. any suggestion?
>>> 
>>> Looking at the streaming java examples, the (commented) apply example
>>> shown in GroupedProcessingTimeWindowExample()
>>> which is applied to a timeWindow, does not work either:
>>> 
>>> .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
>>> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>>> .apply(new SummingWindowFunction())
>>> 
>>> So what I am missing here? any help is appreciated.
>>> 
>>> Regards,
>>> Marcela.
>>> 
>>> 
>>> 
> 
> 
> -- 
> Dr. Marcela Charfuelan, Senior Researcher
> TU Berlin, School of Electrical Engineering and Computer Sciences
> Database Systems and Information Management (DIMA)
> EN7, Einsteinufer 17, D-10587 Berlin
> Room: EN 725  Phone: +49 30-314-23556
> URL: http://www.user.tu-berlin.de/charfuelan

Reply via email to