I just tried playing with the source paralleism setting, and I got a very
strange result:

If specify the source parallism
using env.addSource(kafka).setParallelism(N), results are printed correctly
for any number N except for N=4. I guess that's related to the number of
task slots since I have a 4 CPU cores, but what is the explanation of that?
So I suppose that if I don't specify the source parallelism, it is set
automatically to 4. Isn't it supposed to be set to the number of topic
patitions (= 2) by default?


On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki <yassmar...@gmail.com>
wrote:

> Hi Kostas,
>
> When I remove the window and the apply() and put print() after 
> assignTimestampsAndWatermarks,
> the messages are printed correctly:
>
> 2> Request{ts=2015-01-01, 06:15:34:000}
> 2> Request{ts=2015-01-02, 16:38:10:000}
> 2> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-02, 19:10:00:000}
> 2> Request{ts=2015-01-02, 23:36:51:000}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> ...
>
> But strangely using only one task. If I set the source parallelism to 1
> using env.addSource(kafka).setParallelism(1) (the window and the apply()
> still removed), results are printed using all available slots (number of
> CPU cores):
>
> 4> Request{ts=2015-01-01, 06:15:34:000}
> 4> Request{ts=2015-01-02, 16:38:10:000}
> 2> Request{ts=2015-01-02, 19:10:00:000}
> 4> Request{ts=2015-01-02, 23:36:51:000}
> 1> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> 3> Request{ts=2015-01-03, 17:56:42:000}
> ...
>
> Now if I keep the window and apply() with without specifying source
> parallelism, no messages are printed (only regular kafka consumer and flink
> logs), and if the source parallelism is set to 1, messages are printed
> correctly:
>
> 1> Window: TimeWindow{start=1420070400000, end=1420156800000}
> 2> Request{ts=2015-01-01, 06:15:34:000}
> 1> Request{ts=2015-01-02, 16:38:10:000}
> 4> Request{ts=2015-01-02, 19:10:00:000}
> 3> Window: TimeWindow{start=1420156800000, end=1420243200000}
> 3> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-02, 23:36:51:000}
> 3> Window: TimeWindow{start=1420416000000, end=1420502400000}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> 4> Window: TimeWindow{start=1420243200000, end=1420329600000}
> 1> Request{ts=2015-01-03, 17:56:42:000}
> 1> Request{ts=2015-01-05, 17:13:45:000}
> 4> Request{ts=2015-01-05, 01:25:55:000}
> 2> Request{ts=2015-01-05, 14:27:45:000}
> ...
>
> On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Yassine,
>>
>> Could you just remove the window and the apply, and  just put a print()
>> after the:
>>
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>()
>> {
>>     @Override
>>     public long extractAscendingTimestamp(Request req) {
>>         return req.ts;
>>     }
>> })
>>
>>
>> This at least will tell us if reading from Kafka works as expected.
>>
>> Kostas
>>
>> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <yassmar...@gmail.com>
>> wrote:
>>
>> Hi everyone,
>>
>> I am reading messages from a Kafka topic with 2 partitions and using
>> event time. This is my code:
>>
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>()
>> {
>>     @Override
>>     public long extractAscendingTimestamp(Request req) {
>>         return req.ts;
>>     }
>> })
>> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
>> .apply((TimeWindow window, Iterable<Request> iterable, Collector<String>
>> collector) -> {
>>     collector.collect("Window: " + window.toString());
>>     for (Request req : iterable) {
>>         collector.collect(req.toString());
>>     }
>> })
>> .print()
>>
>> I could get an output only when setting the kafka source parallelism to
>> 1. I guess that is because messages from multiple partitions arrive
>> out-of-order to the timestamp exctractor according to this thread
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-td4782.html#a4804>,
>> correct?
>> So I replaced the AscendingTimestampExtractor with a
>> BoundedOutOfOrdernessGenerator as in the documentation example
>> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#tab_java_3>
>>  (with
>> a higher delay) in order to handle out-of-order events, but I still can't
>> get any output. Why is that?
>>
>> Best,
>> Yassine
>>
>>
>>
>

Reply via email to