Hi, I have a flink streaming application and I want to count records received per second (as a way of measuring the throughput of my application). However, I am using the EventTime time characteristic, as follows:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val s = env.socketTextStream("localhost", 1234) s.map(line => Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-" + System.currentTimeMillis()) val mainStrean = s.map(line => { val Array(p1, p2) = line.split(" ") (p1, p2.toInt) }) .assignAscendingTimestamps(p => System.currentTimeMillis()) which naturally gives me this error: [error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? How can I do this? Thanks.