Thanks for your help, I retest by disable the object reuse and got the same result (please see the picture attached).
2016-02-03 10:51 GMT+01:00 Stephan Ewen <se...@apache.org>: > The definition looks correct. > Because the windows are by-key, you should get one window result per key > per second. > > Can you turn off object-reuse? That is a pretty experimental thing and > works with the batch operations quite well, but not so much with the > streaming windows, yet. > I would only enable object reuse after the program works well and > correctly without. > > Greetings, > Stephan > > > On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yutao.sun...@gmail.com> wrote: > >> Hi Flink users, >> >> I have a question about Tumbling Windows using Processing Time at Flink >> ver 0.10.1 : >> >> In fact, I want to measure the throughput of my application, the idea is >> at the last operator, by using a Tumbling processing Time windows with a >> size of 1 second, I count the message received. >> >> The problem is in case of 4 parallelisms, the number of windows should be >> 4/second, but I got 7 windows/second, I wonder if is there any error the >> windows is defined? >> >> I copy my code here and thanks a lot for your help in advance. >> [KAFKA partition : 4] >> >> >> *val env = StreamExecutionEnvironment.getExecutionEnvironment* >> >> >> *val parallelism = 4* >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource( >> new FlinkKafkaConsumer082[String]( "test_topic", new >> SimpleStringSchema, properties for connection KAFKA )) .rebalance >> .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => >> mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, >> TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, >> Long)) => (tuple._0, tuple._1 + 1L)) .addSink( new >> FlinkKafkaProducer[(Payload, Long)]( KafkaBootstrapServers, >> TARGET_TOPIC, new SerializationSchema[(Payload, Long), Array[Byte]] { >> override def serialize(element: (Payload, Long)): Array[Byte] = { >> element._2.toString().getBytes } } ))env.execute("test")* >> >> >> >> >