Do you have 7 distinct keys? You get as many result tuples as you have keys, because the window is per key.
On Wed, Feb 3, 2016 at 12:12 PM, yutao sun <yutao.sun...@gmail.com> wrote: > Thanks for your help, I retest by disable the object reuse and got the > same result (please see the picture attached). > > > > > > 2016-02-03 10:51 GMT+01:00 Stephan Ewen <se...@apache.org>: > >> The definition looks correct. >> Because the windows are by-key, you should get one window result per key >> per second. >> >> Can you turn off object-reuse? That is a pretty experimental thing and >> works with the batch operations quite well, but not so much with the >> streaming windows, yet. >> I would only enable object reuse after the program works well and >> correctly without. >> >> Greetings, >> Stephan >> >> >> On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yutao.sun...@gmail.com> wrote: >> >>> Hi Flink users, >>> >>> I have a question about Tumbling Windows using Processing Time at Flink >>> ver 0.10.1 : >>> >>> In fact, I want to measure the throughput of my application, the idea is >>> at the last operator, by using a Tumbling processing Time windows with a >>> size of 1 second, I count the message received. >>> >>> The problem is in case of 4 parallelisms, the number of windows should >>> be 4/second, but I got 7 windows/second, I wonder if is there any error >>> the windows is defined? >>> >>> I copy my code here and thanks a lot for your help in advance. >>> [KAFKA partition : 4] >>> >>> >>> *val env = StreamExecutionEnvironment.getExecutionEnvironment* >>> >>> >>> *val parallelism = 4* >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource( >>> new FlinkKafkaConsumer082[String]( "test_topic", new >>> SimpleStringSchema, properties for connection KAFKA )) .rebalance >>> .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => >>> mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, >>> TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, >>> Long)) => (tuple._0, tuple._1 + 1L)) .addSink( new >>> FlinkKafkaProducer[(Payload, Long)]( KafkaBootstrapServers, >>> TARGET_TOPIC, new SerializationSchema[(Payload, Long), Array[Byte]] { >>> override def serialize(element: (Payload, Long)): Array[Byte] = { >>> element._2.toString().getBytes } } ))env.execute("test")* >>> >>> >>> >>> >> >