Hi, I’m afraid there is no other solution besides calling keyBy() again if you require a keyed data stream. This has to do with the data model of Flink, where the key is part of the regular data element. This is different from systems that have a (Key, Value) data model. Those systems can provide primitives that only work on the value and therefore they can preserve a key partitioning.
Cheers, Aljoscha > On 03 Nov 2015, at 18:17, Martin Neumann <mneum...@sics.se> wrote: > > Here is some code of my current solution, if there is some better way of > doing it let me know. > > KeyedStream<DataPojo,String> hostStream = inStream > .keyBy(t -> t.getHost()); > KeyedStream<Tuple2<Integer,String>,String> freqStream = hostStreams > .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS)) > .fold(new Tuple2<Integer, String>(0, ""), new Count()) > .keyBy(t -> t.f1); > ... > > I wish I could get rid of that annoying Tuple2 after the fold and just have a > KeyedStream<Integer,String> but I didn't find anything in the documentation > that would allow me to do that. > An other curious thing is that in the 2nd statement .keyBy(t -> t.f1) works > but .keyBy(1) does not, even though they do the same thing. I'm using Idea at > the moment so it can be just another type inference problem with that IDE. > > cheers > Martin > > On Tue, Nov 3, 2015 at 3:06 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > where are you storing the results of each window computation to? Maybe you > could also store it from inside a custom WindowFunction where you just count > the elements and then store the results. > > On the other hand, adding a (1) field and doing a window reduce (à la > WordCount) is going to be way more efficient because we only have to keep one > element per window (the current reduced tuple) instead of all the tuples, as > we have to for a fold or WindowFunction. If you want you can also combine a > reduce and WindowFunction: > WindowedStream.apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, > W> function) > > here, the ReduceFunction does the WordCount-like counting while in the > WindowFunction you get the final result and store it inside your model. > > Let me know if you need more information. > > Cheers, > Aljoscha > > On 03 Nov 2015, at 11:28, Martin Neumann <mneum...@sics.se> wrote: > > > > Hej, > > > > I want to do the following thing: > > 1. Split a Stream of incoming Logs by host address. > > 2. For each Key, create time based windows > > 3. Count the number of items in the window > > 4. Feed it into a statistical model that is maintained for each host > > > > Since I don't have fields to sum upon, I use a (window) fold function to > > count the number of elements in the window. (Maybe there is a better way to > > do this, or it could be part of the primitives) > > My problem is now that I get back a DataStream so the distribution by key > > is lost. Is there a way to preserve the distribution by key? Currently I > > only store the count of element in the windows so I cannot simple do byKey > > again. > > > > I could fold into tuples that have the count and also contain the host > > address but that feels clumsy. > > > > Any hints are welcome. > > > > > > cheers Martin > >