The definition looks correct.
Because the windows are by-key, you should get one window result per key
per second.

Can you turn off object-reuse? That is a pretty experimental thing and
works with the batch operations quite well, but not so much with the
streaming windows, yet.
I would only enable object reuse after the program works well and correctly
without.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yutao.sun...@gmail.com> wrote:

> Hi Flink users,
>
> I have a question about Tumbling Windows using Processing Time at Flink
> ver 0.10.1 :
>
> In fact, I want to measure the throughput of my application, the idea is
> at the last operator, by using a Tumbling processing Time windows with a
> size of 1 second, I count the message received.
>
> The problem is in case of 4 parallelisms, the number of windows should be
> 4/second, but I got 7 windows/second,  I wonder if is there any error the
> windows is defined?
>
> I copy my code here and thanks a lot for your help in advance.
> [KAFKA partition : 4]
>
>
> *val env = StreamExecutionEnvironment.getExecutionEnvironment*
>
>
> *val parallelism = 4*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(
>   new FlinkKafkaConsumer082[String](    "test_topic",    new 
> SimpleStringSchema,    properties for connection KAFKA  )) .rebalance .map(do 
> some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => 
> mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, 
> TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) 
> => (tuple._0, tuple._1 + 1L)) .addSink(   new FlinkKafkaProducer[(Payload, 
> Long)](    KafkaBootstrapServers,    TARGET_TOPIC,    new 
> SerializationSchema[(Payload, Long), Array[Byte]] {      override def 
> serialize(element: (Payload, Long)): Array[Byte] = {        
> element._2.toString().getBytes      }    }  ))env.execute("test")*
>
>
>
>

Reply via email to