Thanks for your help,  I retest by disable the object reuse and got the
same result (please see the picture attached).


​
​

2016-02-03 10:51 GMT+01:00 Stephan Ewen <se...@apache.org>:

> The definition looks correct.
> Because the windows are by-key, you should get one window result per key
> per second.
>
> Can you turn off object-reuse? That is a pretty experimental thing and
> works with the batch operations quite well, but not so much with the
> streaming windows, yet.
> I would only enable object reuse after the program works well and
> correctly without.
>
> Greetings,
> Stephan
>
>
> On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yutao.sun...@gmail.com> wrote:
>
>> Hi Flink users,
>>
>> I have a question about Tumbling Windows using Processing Time at Flink
>> ver 0.10.1 :
>>
>> In fact, I want to measure the throughput of my application, the idea is
>> at the last operator, by using a Tumbling processing Time windows with a
>> size of 1 second, I count the message received.
>>
>> The problem is in case of 4 parallelisms, the number of windows should be
>> 4/second, but I got 7 windows/second,  I wonder if is there any error the
>> windows is defined?
>>
>> I copy my code here and thanks a lot for your help in advance.
>> [KAFKA partition : 4]
>>
>>
>> *val env = StreamExecutionEnvironment.getExecutionEnvironment*
>>
>>
>> *val parallelism = 4*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(
>>   new FlinkKafkaConsumer082[String](    "test_topic",    new 
>> SimpleStringSchema,    properties for connection KAFKA  )) .rebalance 
>> .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => 
>> mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, 
>> TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, 
>> Long)) => (tuple._0, tuple._1 + 1L)) .addSink(   new 
>> FlinkKafkaProducer[(Payload, Long)](    KafkaBootstrapServers,    
>> TARGET_TOPIC,    new SerializationSchema[(Payload, Long), Array[Byte]] {     
>>  override def serialize(element: (Payload, Long)): Array[Byte] = {        
>> element._2.toString().getBytes      }    }  ))env.execute("test")*
>>
>>
>>
>>
>

Reply via email to