Hi Benjamin,

In your case, the tumbling window subtasks would each have 3 input streams, 1 
for each of the 3 FlinkKafkaConsumer operator subtasks.

I thought that each subtask of the window would get only elements from one 
partition and therefore the watermarks would be calculated independently per 
stream. 
This is a misunderstanding. After the keyBy, the window subtasks could get 
input from any of the consumer subtasks, and would therefore need to wait for 
broadcasted watermarks from all of them. It just happens to be that in your 
case, each consumer subtasks will only produce records with exactly one key.

Moving back a bit to your original setup: it seems like what you want to 
achieve is a simple window on each partition independently, and then produce 
the window output to a new partition.
In your original setup where each topic and its corresponding output topic each 
has 1 partition, I’d actually just have separate jobs for each topic-to-topic 
pipeline, instead of bundling them into one job. Was there any specific reason 
for bundling them together?

Cheers,
Gordon
On 17 April 2017 at 5:04:26 PM, Benjamin Reißaus (benjamin.reiss...@gmail.com) 
wrote:

Hi, 

So I have been rearranging my architecture to where I only have one input and 
one output topic, each with 3 partitions and in my flink job I have one 
consumer and one producer running with parallelism of 3. To run in parallel, I 
extract the partition from the metadata information per kafka message and keyBy 
that very partition. The code sample is at the bottom. 

Now it seems though, that my tumbling window of 1 second that I run on all 
partitions and that I use to calculate statistics only gives output on one 
partition. The reason seems to be that the timestamps of partition A and B are 
2 hours ahead of partition C. In the documentation I read that the event time 
of an operator following a keyBy (my tumbling window) is the minimum of its 
input streams’ event times. 

But is that even the case for me? Does my tumbling window have multiple input 
streams? I thought that each subtask of the window would get only elements from 
one partition and therefore the watermarks would be calculated independently 
per stream. 

I would appreciate any input! Again, my goal is to run the same queries on 
independent kafka streams. 

Best regards,
Ben

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.hpi.esb.flink.datamodel.{SimpleRecord, Statistics}

class StatisticsQuery(windowSize: Int)
  extends Query[(String, SimpleRecord), (String, Statistics)] {

  override def execute(stream: DataStream[(String, SimpleRecord)]): 
DataStream[(String, Statistics)] = {
    stream
      .keyBy(_._1)
      .timeWindow(Time.milliseconds(windowSize))
      .fold(("", new Statistics())) { (acc, value) => Statistics.fold(acc, 
value) }
  }
}

2017-04-14 19:22 GMT+02:00 Benjamin Reißaus <benjamin.reiss...@gmail.com>:
Hi everybody,

 

I have the following flink/kafka setup:

 

I have 3 kafka “input” topics and 3 “output” topics with each 1 partition (only 
1 partition because the order of the messages is important). I also have 1 
master and 2 flink slave nodes with a total of 16 task slots.

In my flink program I have created 3 consumers - each for one of the input 
topics.

On each of the datastreams I run a query that generates statistics over a 
window of 1 second and I write the result to the corresponding output topic. 
You can find the execution plan with parallelism set to 2 attached.

 

This setup with parallelism=2 sometimes seems to give me the wrong order of the 
statistics results. I assume it is because of the rebalancing before the last 
map which leads to a race condition when writing to kafka.

 

If I set parallelism to 1 no rebalancing will be done but only one task slot is 
used.

 

This has led me to the following questions:

 

Why is only 1 task slot used with my 3 pipelines when parallelism is set to 1? 
As far as I understand, the parallelism refers to the number of parallel 
instances a task can be split into. Therefore, I would assume that I could 
still run multiple different tasks (e.g. different maps or window functions on 
different streams) in different task slots, right?

 

And to come back to my requirement: Is it not possible to run all 3 pipelines 
in parallel and still keep the order of the messages and results?

 

I also asked these questions on stackoverflow. And it seems that I have similar 
trouble understanding the terms “task slot”, “subtasks” etc. like Flavio 
mentioning in this flink mail thread.

 

Thank you and I would appreciate any input!

 

Best regards,

Ben


Reply via email to