Hi, So I have been rearranging my architecture to where I only have one input and one output topic, each with 3 partitions and in my flink job I have one consumer and one producer running with parallelism of 3. To run in parallel, I extract the partition from the metadata information per kafka message and keyBy that very partition. The code sample is at the bottom.
Now it seems though, that my tumbling window of 1 second that I run on all partitions and that I use to calculate statistics only gives output on one partition. The reason seems to be that the timestamps of partition A and B are 2 hours ahead of partition C. In the documentation I read that the event time of an operator following a keyBy (my tumbling window) is the minimum of its input streams’ event times. But is that even the case for me? Does my tumbling window have multiple input streams? I thought that each subtask of the window would get only elements from one partition and therefore the watermarks would be calculated independently per stream. I would appreciate any input! Again, my goal is to run the same queries on independent kafka streams. Best regards, Ben import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.windowing.time.Time import org.hpi.esb.flink.datamodel.{SimpleRecord, Statistics} class StatisticsQuery(windowSize: Int) extends Query[(String, SimpleRecord), (String, Statistics)] { override def execute(stream: DataStream[(String, SimpleRecord)]): DataStream[(String, Statistics)] = { stream .keyBy(_._1) .timeWindow(Time.milliseconds(windowSize)) .fold(("", new Statistics())) { (acc, value) => Statistics.fold(acc, value) } } } 2017-04-14 19:22 GMT+02:00 Benjamin Reißaus <benjamin.reiss...@gmail.com>: > Hi everybody, > > > > *I have the following flink/kafka setup:* > > > > I have 3 kafka “input” topics and 3 “output” topics with each 1 partition > (only 1 partition because the order of the messages is important). I also > have 1 master and 2 flink slave nodes with a total of 16 task slots. > > In my flink program I have created 3 consumers - each for one of the input > topics. > > On each of the datastreams I run a query that generates statistics over a > window of 1 second and I write the result to the corresponding output > topic. You can find the execution plan with parallelism set to 2 attached. > > > > This setup with parallelism=2 sometimes seems to give me the wrong order > of the statistics results. I assume it is because of the rebalancing before > the last map which leads to a race condition when writing to kafka. > > > > If I set parallelism to 1 no rebalancing will be done but only one task > slot is used. > > > > *This has led me to the following questions:* > > > > Why is only 1 task slot used with my 3 pipelines when parallelism is set > to 1? As far as I understand, the parallelism refers to the number of > parallel instances a task can be split into. Therefore, I would assume that > I could still run multiple different tasks (e.g. different maps or window > functions on different streams) in different task slots, right? > > > > And to come back to my requirement: Is it not possible to run all 3 > pipelines in parallel and still keep the order of the messages and results? > > > > I also asked these questions on stackoverflow > <http://stackoverflow.com/q/43392793/6289872>. And it seems that I have > similar trouble understanding the terms “task slot”, “subtasks” etc. like > Flavio mentioning in this > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-slots-threads-task-etc-td12534.html> > flink > mail thread. > > > > Thank you and I would appreciate any input! > > > > Best regards, > > Ben >