Hi Flink users, I have a question about Tumbling Windows using Processing Time at Flink ver 0.10.1 :
In fact, I want to measure the throughput of my application, the idea is at the last operator, by using a Tumbling processing Time windows with a size of 1 second, I count the message received. The problem is in case of 4 parallelisms, the number of windows should be 4/second, but I got 7 windows/second, I wonder if is there any error the windows is defined? I copy my code here and thanks a lot for your help in advance. [KAFKA partition : 4] *val env = StreamExecutionEnvironment.getExecutionEnvironment* *val parallelism = 4* *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource( new FlinkKafkaConsumer082[String]( "test_topic", new SimpleStringSchema, properties for connection KAFKA )) .rebalance .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L)) .addSink( new FlinkKafkaProducer[(Payload, Long)]( KafkaBootstrapServers, TARGET_TOPIC, new SerializationSchema[(Payload, Long), Array[Byte]] { override def serialize(element: (Payload, Long)): Array[Byte] = { element._2.toString().getBytes } } ))env.execute("test")*