Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-07-20 Thread Meera
We couldn't put the map phase in between working with stream transformation classes and it created a dangling Mapper - but doing partitioner/tranformation with the window operator worked. WindowOperator operator = ... KeyGroupStreamPartitioner partitioner = new KeyGroupStreamPartitioner(new Dim

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-06-13 Thread Aljoscha Krettek
Hi, As a simple test, you can put your key extraction logic into a MapFunction, i.e. MapFunction, Tuple2> and then simply use that field as the key: input .map(new MyKeyExtractorMapper()) .keyBy(0) If that solves your problem it means that the key extraction is not deterministic. This is

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-06-12 Thread Meera
Did this problem get resolved - I am running into this problem when I parallelize the tasks Unexpected key group index. This indicates a bug. - it runs fine on 1 parallelism. This suggests there is some key grouping issue - I checked my Watermark and KeySelector - they look okay. The snippet

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-08 Thread Aljoscha Krettek
It seems that eventTime is a static field in TopicPojo and the key selector also just gets the static field via TopicPojo.getEventTime(). Why is that? Because with this the event time basically has nothing to do with the data. > On 5. May 2017, at 10:32, G.S.Vijay Raajaa wrote: > > I tried the

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-05 Thread G.S.Vijay Raajaa
I tried the timestamp field as a string datatype as well as a Date object. Getting same error in both the cases; Please find the POJO file: import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-05 Thread Aljoscha Krettek
What’s the KeySelector you’re using? To me, this indicates that the timestamp field is somehow changing after the original keying or in transit. Best. Aljoscha > On 4. May 2017, at 22:01, G.S.Vijay Raajaa wrote: > > I tried to reorder and the window function works fine. but then after > proces

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-04 Thread G.S.Vijay Raajaa
I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field. java.lang.RuntimeException: Unexpected key group index. This indicates a bug. at org.ap

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-03 Thread G.S.Vijay Raajaa
Thanks for your input, will try to incorporate them in my implementation. Regards, Vijay Raajaa G S On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek wrote: > The approach could work, but if it can happen that an event from stream A > is not matched by an event in stream B you will have lingerin

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-03 Thread Aljoscha Krettek
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-03 Thread G.S.Vijay Raajaa
Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON? Regards, Vijay Raajaa G S On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek wrote: > Hi, > An AllWindow operator requires an AllWindowFunction, instead o

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-03 Thread Aljoscha Krettek
Hi, An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to: inputStream .keyBy(…) .window(…) .apply(…) // or reduce() In your case, you key the st