Thanks Jun,
Very useful, I was confusing the parameters because my input is tuples,
which I might not need in the end...
I have now what I wanted (non-parallel and not so efficient I guess, any
suggestion to improve is welcome) and I have to modify the trigger so to
FIRE_AND_PURGE when it reaches N, the max number of items per window,
otherwise it will count the whole data every time...
So my example looks like this now:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> transactions = env.fromElements(
"1 2 4 3 4",
"3 4 5 4 6",
"7 3 3 6 1",
"1 3 2 4 6"
);
DataStream<Hashtable<String, Integer>> counts = transactions
.flatMap(new LineSplitter()) // because I am expecting one
transaction per line
.windowAll(GlobalWindows.create())
.trigger(MyCountTrigger.of(5))
.apply(new MyWindowFunction());
counts.print();
env.execute("ItemsCount");
public static class MyWindowFunction implements
AllWindowFunction<Tuple2<String,Integer>, Hashtable<String, Integer>,
GlobalWindow> {
public Hashtable<String, Integer> itemsMap = new Hashtable<String,
Integer>();
@Override
public void apply (GlobalWindow window,
Iterable<Tuple2<String,Integer>> tuples,
Collector<Hashtable<String, Integer>> out) throws
Exception {
for(Tuple2<String,Integer> tuple : tuples){
if(itemsMap.containsKey(tuple.f0)){
itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1);
} else {
itemsMap.put(tuple.f0,1);
}
}
out.collect(itemsMap);
}
}
Regards,
Marcela.
On 08.03.2016 09:34, Wang Yangjun wrote:
Hello Marcela,
I am not sure what is the “parameters mismatch” here. From the example you
shown, it seems that you just want do a window word count. Right?
Could you try this code and is it want you want?
Best,
Jun
-------------------------------------------------
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1,
3, 2, 4, 6};
List<Integer> list = Arrays.asList(array);
DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer,
Tuple2<Integer, Integer>, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Integer> tuples,
Collector<Tuple2<Integer, Integer>> out) throws Exception {
HashMap<Integer, Integer> map = new HashMap<>();
for(Integer tuple : tuples){
Integer value = 0;
if(map.containsKey(tuple)){
value = map.get(tuple);
}
map.put(tuple, value+1);
}
for(Map.Entry<Integer, Integer> entry : map.entrySet()) {
out.collect(new Tuple2<>(entry.getKey(),
entry.getValue()));
}
}
});
counts.print();
env.execute("Stream WordCount");
On 08/03/16 02:57, "Marcela Charfuelan" <charfuelanol...@tu-berlin.de> wrote:
hello,
I want to make a function for counting items (per type) in windows of
size N; For example for N=5 and the stream:
1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6
I would like to generate the tuples:
w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)
I am trying to apply my own function with "Window apply", something like:
items
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(5))
.apply(new MyWindowfunction())
but in this case there is a parameters mismatch with apply and
WindowFunction, so I am not sure if it is not possible here. any suggestion?
Looking at the streaming java examples, the (commented) apply example
shown in GroupedProcessingTimeWindowExample()
which is applied to a timeWindow, does not work either:
.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.apply(new SummingWindowFunction())
So what I am missing here? any help is appreciated.
Regards,
Marcela.
--
Dr. Marcela Charfuelan, Senior Researcher
TU Berlin, School of Electrical Engineering and Computer Sciences
Database Systems and Information Management (DIMA)
EN7, Einsteinufer 17, D-10587 Berlin
Room: EN 725 Phone: +49 30-314-23556
URL: http://www.user.tu-berlin.de/charfuelan