Hi Benjamin, In your case, the tumbling window subtasks would each have 3 input streams, 1 for each of the 3 FlinkKafkaConsumer operator subtasks.
I thought that each subtask of the window would get only elements from one partition and therefore the watermarks would be calculated independently per stream. This is a misunderstanding. After the keyBy, the window subtasks could get input from any of the consumer subtasks, and would therefore need to wait for broadcasted watermarks from all of them. It just happens to be that in your case, each consumer subtasks will only produce records with exactly one key. Moving back a bit to your original setup: it seems like what you want to achieve is a simple window on each partition independently, and then produce the window output to a new partition. In your original setup where each topic and its corresponding output topic each has 1 partition, I’d actually just have separate jobs for each topic-to-topic pipeline, instead of bundling them into one job. Was there any specific reason for bundling them together? Cheers, Gordon On 17 April 2017 at 5:04:26 PM, Benjamin Reißaus (benjamin.reiss...@gmail.com) wrote: Hi, So I have been rearranging my architecture to where I only have one input and one output topic, each with 3 partitions and in my flink job I have one consumer and one producer running with parallelism of 3. To run in parallel, I extract the partition from the metadata information per kafka message and keyBy that very partition. The code sample is at the bottom. Now it seems though, that my tumbling window of 1 second that I run on all partitions and that I use to calculate statistics only gives output on one partition. The reason seems to be that the timestamps of partition A and B are 2 hours ahead of partition C. In the documentation I read that the event time of an operator following a keyBy (my tumbling window) is the minimum of its input streams’ event times. But is that even the case for me? Does my tumbling window have multiple input streams? I thought that each subtask of the window would get only elements from one partition and therefore the watermarks would be calculated independently per stream. I would appreciate any input! Again, my goal is to run the same queries on independent kafka streams. Best regards, Ben import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.windowing.time.Time import org.hpi.esb.flink.datamodel.{SimpleRecord, Statistics} class StatisticsQuery(windowSize: Int) extends Query[(String, SimpleRecord), (String, Statistics)] { override def execute(stream: DataStream[(String, SimpleRecord)]): DataStream[(String, Statistics)] = { stream .keyBy(_._1) .timeWindow(Time.milliseconds(windowSize)) .fold(("", new Statistics())) { (acc, value) => Statistics.fold(acc, value) } } } 2017-04-14 19:22 GMT+02:00 Benjamin Reißaus <benjamin.reiss...@gmail.com>: Hi everybody, I have the following flink/kafka setup: I have 3 kafka “input” topics and 3 “output” topics with each 1 partition (only 1 partition because the order of the messages is important). I also have 1 master and 2 flink slave nodes with a total of 16 task slots. In my flink program I have created 3 consumers - each for one of the input topics. On each of the datastreams I run a query that generates statistics over a window of 1 second and I write the result to the corresponding output topic. You can find the execution plan with parallelism set to 2 attached. This setup with parallelism=2 sometimes seems to give me the wrong order of the statistics results. I assume it is because of the rebalancing before the last map which leads to a race condition when writing to kafka. If I set parallelism to 1 no rebalancing will be done but only one task slot is used. This has led me to the following questions: Why is only 1 task slot used with my 3 pipelines when parallelism is set to 1? As far as I understand, the parallelism refers to the number of parallel instances a task can be split into. Therefore, I would assume that I could still run multiple different tasks (e.g. different maps or window functions on different streams) in different task slots, right? And to come back to my requirement: Is it not possible to run all 3 pipelines in parallel and still keep the order of the messages and results? I also asked these questions on stackoverflow. And it seems that I have similar trouble understanding the terms “task slot”, “subtasks” etc. like Flavio mentioning in this flink mail thread. Thank you and I would appreciate any input! Best regards, Ben