Do you have 7 distinct keys? You get as many result tuples as you have
keys, because the window is per key.

On Wed, Feb 3, 2016 at 12:12 PM, yutao sun <yutao.sun...@gmail.com> wrote:

> Thanks for your help,  I retest by disable the object reuse and got the
> same result (please see the picture attached).
>
>
> ​
> ​
>
> 2016-02-03 10:51 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
>> The definition looks correct.
>> Because the windows are by-key, you should get one window result per key
>> per second.
>>
>> Can you turn off object-reuse? That is a pretty experimental thing and
>> works with the batch operations quite well, but not so much with the
>> streaming windows, yet.
>> I would only enable object reuse after the program works well and
>> correctly without.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yutao.sun...@gmail.com> wrote:
>>
>>> Hi Flink users,
>>>
>>> I have a question about Tumbling Windows using Processing Time at Flink
>>> ver 0.10.1 :
>>>
>>> In fact, I want to measure the throughput of my application, the idea is
>>> at the last operator, by using a Tumbling processing Time windows with a
>>> size of 1 second, I count the message received.
>>>
>>> The problem is in case of 4 parallelisms, the number of windows should
>>> be 4/second, but I got 7 windows/second,  I wonder if is there any error
>>> the windows is defined?
>>>
>>> I copy my code here and thanks a lot for your help in advance.
>>> [KAFKA partition : 4]
>>>
>>>
>>> *val env = StreamExecutionEnvironment.getExecutionEnvironment*
>>>
>>>
>>> *val parallelism = 4*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(
>>>   new FlinkKafkaConsumer082[String](    "test_topic",    new 
>>> SimpleStringSchema,    properties for connection KAFKA  )) .rebalance 
>>> .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => 
>>> mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, 
>>> TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, 
>>> Long)) => (tuple._0, tuple._1 + 1L)) .addSink(   new 
>>> FlinkKafkaProducer[(Payload, Long)](    KafkaBootstrapServers,    
>>> TARGET_TOPIC,    new SerializationSchema[(Payload, Long), Array[Byte]] {    
>>>   override def serialize(element: (Payload, Long)): Array[Byte] = {        
>>> element._2.toString().getBytes      }    }  ))env.execute("test")*
>>>
>>>
>>>
>>>
>>
>

Reply via email to