The definition looks correct. Because the windows are by-key, you should get one window result per key per second.
Can you turn off object-reuse? That is a pretty experimental thing and works with the batch operations quite well, but not so much with the streaming windows, yet. I would only enable object reuse after the program works well and correctly without. Greetings, Stephan On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yutao.sun...@gmail.com> wrote: > Hi Flink users, > > I have a question about Tumbling Windows using Processing Time at Flink > ver 0.10.1 : > > In fact, I want to measure the throughput of my application, the idea is > at the last operator, by using a Tumbling processing Time windows with a > size of 1 second, I count the message received. > > The problem is in case of 4 parallelisms, the number of windows should be > 4/second, but I got 7 windows/second, I wonder if is there any error the > windows is defined? > > I copy my code here and thanks a lot for your help in advance. > [KAFKA partition : 4] > > > *val env = StreamExecutionEnvironment.getExecutionEnvironment* > > > *val parallelism = 4* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource( > new FlinkKafkaConsumer082[String]( "test_topic", new > SimpleStringSchema, properties for connection KAFKA )) .rebalance .map(do > some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => > mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, > TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) > => (tuple._0, tuple._1 + 1L)) .addSink( new FlinkKafkaProducer[(Payload, > Long)]( KafkaBootstrapServers, TARGET_TOPIC, new > SerializationSchema[(Payload, Long), Array[Byte]] { override def > serialize(element: (Payload, Long)): Array[Byte] = { > element._2.toString().getBytes } } ))env.execute("test")* > > > >