Hi,
you can explicitly specify that you want processing-time windows like this:

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(...)

Also note that the timestamp you append in
"writeAsCsv("records-per-second-" + System.currentTimeMillis())" will only
take the timestamp at the time when this function is called, this will only
happen once when your program is started.

Best,
Aljoscha

On Tue, 28 Jun 2016 at 17:33 Saiph Kappa <saiph.ka...@gmail.com> wrote:

> Hi,
>
> I have a flink streaming application and I want to count records received
> per second (as a way of measuring the throughput of my application).
> However, I am using the EventTime time characteristic, as follows:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val s = env.socketTextStream("localhost", 1234)
>
> s.map(line => 
> Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-"
>  +
>   System.currentTimeMillis())
>
> val mainStrean = s.map(line => {
>   val Array(p1, p2) = line.split(" ")
>   (p1, p2.toInt)
> })
>   .assignAscendingTimestamps(p => System.currentTimeMillis())
>
> which naturally gives me this error:
>
> [error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
> timestamp (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>
> How can I do this?
>
>
> Thanks.
>

Reply via email to