Hi,

I have a flink streaming application and I want to count records received
per second (as a way of measuring the throughput of my application).
However, I am using the EventTime time characteristic, as follows:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val s = env.socketTextStream("localhost", 1234)

s.map(line => 
Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-"
+
  System.currentTimeMillis())

val mainStrean = s.map(line => {
  val Array(p1, p2) = line.split(" ")
  (p1, p2.toInt)
})
  .assignAscendingTimestamps(p => System.currentTimeMillis())

which naturally gives me this error:

[error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
timestamp (= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?

How can I do this?


Thanks.

Reply via email to