Hi Flink users,

I have a question about Tumbling Windows using Processing Time at Flink ver
0.10.1 :

In fact, I want to measure the throughput of my application, the idea is at
the last operator, by using a Tumbling processing Time windows with a size
of 1 second, I count the message received.

The problem is in case of 4 parallelisms, the number of windows should be
4/second, but I got 7 windows/second,  I wonder if is there any error the
windows is defined?

I copy my code here and thanks a lot for your help in advance.
[KAFKA partition : 4]


*val env = StreamExecutionEnvironment.getExecutionEnvironment*


*val parallelism = 4*

































*env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(
 new FlinkKafkaConsumer082[String](    "test_topic",    new
SimpleStringSchema,    properties for connection KAFKA  )) .rebalance
.map(do some thing) .map(payload => (payload, 1L))
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
.timeWindow(Time.of(1, TimeUnit.SECONDS)) .reduce((tuple0: (Payload,
Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L))
.addSink(   new FlinkKafkaProducer[(Payload, Long)](
KafkaBootstrapServers,    TARGET_TOPIC,    new
SerializationSchema[(Payload, Long), Array[Byte]] {      override def
serialize(element: (Payload, Long)): Array[Byte] = {
element._2.toString().getBytes      }    }  ))env.execute("test")*

Reply via email to