I just tried playing with the source paralleism setting, and I got a very strange result:
If specify the source parallism using env.addSource(kafka).setParallelism(N), results are printed correctly for any number N except for N=4. I guess that's related to the number of task slots since I have a 4 CPU cores, but what is the explanation of that? So I suppose that if I don't specify the source parallelism, it is set automatically to 4. Isn't it supposed to be set to the number of topic patitions (= 2) by default? On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki <yassmar...@gmail.com> wrote: > Hi Kostas, > > When I remove the window and the apply() and put print() after > assignTimestampsAndWatermarks, > the messages are printed correctly: > > 2> Request{ts=2015-01-01, 06:15:34:000} > 2> Request{ts=2015-01-02, 16:38:10:000} > 2> Request{ts=2015-01-02, 18:58:41:000} > 2> Request{ts=2015-01-02, 19:10:00:000} > 2> Request{ts=2015-01-02, 23:36:51:000} > 2> Request{ts=2015-01-03, 17:38:47:000} > ... > > But strangely using only one task. If I set the source parallelism to 1 > using env.addSource(kafka).setParallelism(1) (the window and the apply() > still removed), results are printed using all available slots (number of > CPU cores): > > 4> Request{ts=2015-01-01, 06:15:34:000} > 4> Request{ts=2015-01-02, 16:38:10:000} > 2> Request{ts=2015-01-02, 19:10:00:000} > 4> Request{ts=2015-01-02, 23:36:51:000} > 1> Request{ts=2015-01-02, 18:58:41:000} > 2> Request{ts=2015-01-03, 17:38:47:000} > 3> Request{ts=2015-01-03, 17:56:42:000} > ... > > Now if I keep the window and apply() with without specifying source > parallelism, no messages are printed (only regular kafka consumer and flink > logs), and if the source parallelism is set to 1, messages are printed > correctly: > > 1> Window: TimeWindow{start=1420070400000, end=1420156800000} > 2> Request{ts=2015-01-01, 06:15:34:000} > 1> Request{ts=2015-01-02, 16:38:10:000} > 4> Request{ts=2015-01-02, 19:10:00:000} > 3> Window: TimeWindow{start=1420156800000, end=1420243200000} > 3> Request{ts=2015-01-02, 18:58:41:000} > 2> Request{ts=2015-01-02, 23:36:51:000} > 3> Window: TimeWindow{start=1420416000000, end=1420502400000} > 2> Request{ts=2015-01-03, 17:38:47:000} > 4> Window: TimeWindow{start=1420243200000, end=1420329600000} > 1> Request{ts=2015-01-03, 17:56:42:000} > 1> Request{ts=2015-01-05, 17:13:45:000} > 4> Request{ts=2015-01-05, 01:25:55:000} > 2> Request{ts=2015-01-05, 14:27:45:000} > ... > > On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Hi Yassine, >> >> Could you just remove the window and the apply, and just put a print() >> after the: >> >> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() >> { >> @Override >> public long extractAscendingTimestamp(Request req) { >> return req.ts; >> } >> }) >> >> >> This at least will tell us if reading from Kafka works as expected. >> >> Kostas >> >> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <yassmar...@gmail.com> >> wrote: >> >> Hi everyone, >> >> I am reading messages from a Kafka topic with 2 partitions and using >> event time. This is my code: >> >> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() >> { >> @Override >> public long extractAscendingTimestamp(Request req) { >> return req.ts; >> } >> }) >> .windowAll(TumblingEventTimeWindows.of(Time.days(1))) >> .apply((TimeWindow window, Iterable<Request> iterable, Collector<String> >> collector) -> { >> collector.collect("Window: " + window.toString()); >> for (Request req : iterable) { >> collector.collect(req.toString()); >> } >> }) >> .print() >> >> I could get an output only when setting the kafka source parallelism to >> 1. I guess that is because messages from multiple partitions arrive >> out-of-order to the timestamp exctractor according to this thread >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-td4782.html#a4804>, >> correct? >> So I replaced the AscendingTimestampExtractor with a >> BoundedOutOfOrdernessGenerator as in the documentation example >> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#tab_java_3> >> (with >> a higher delay) in order to handle out-of-order events, but I still can't >> get any output. Why is that? >> >> Best, >> Yassine >> >> >> >