Changing TimeCharacteristic to EventTime the flink still throws that runtime exception error. Is `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the correct way to set that feature?
Thanks. java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi David, > > You are using Tumbling event time windows, but you set the > timeCharacteristic to processing time. > If you want processing time, then you should use > TumblingProcessingTimeWindows and remove the timestampAssigner. > If you want event time, then you need to set the timeCharacteristic to > eventTime and leave the rest of your code as is. > > Let me know if this answered your question. > > Cheers, > Kostas > >> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com> wrote: >> >> I have two streams. One will produce a single record, and the other >> have a list of records. And I want to do left join. So for example, >> >> Stream A: >> record1 >> record2 >> ... >> >> Stream B: >> single-record >> >> After joined, >> >> record1, single-record >> record2, single-record >> ... >> >> However with the following streaming job, it throws an exception >> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even >> setStreamTimeCharacteristic is configured to ProcessingTime and >> assignTimestampsAndWatermarks is called. >> >> How can I fix this runtime exception? >> >> Thanks. >> >> object App { >> def main(args: Array[String]) { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >> val left = env.fromElements(1, 2, 3, 4, 5).map( >> new MapFunction[Int, T2[Int, String]] { >> override def map(value: Int): T2[Int, String] = >> new T2[Int, String](value, "data 1") >> } >> ).assignTimestampsAndWatermarks(new MyTimestampExtractor) >> >> val right = env.fromElements(99).map( >> new MapFunction[Int, T2[Int, String]] { >> override def map(value: Int): T2[Int, String] = >> new T2[Int, String](value, "data 2") >> } >> ) >> left.coGroup(right). >> where { t2 => t2.f0 }. >> equalTo{ t2=> t2.f0 }. >> window(TumblingEventTimeWindows.of(Time.seconds(1))). >> apply(new Join()).print >> env.execute >> } >> } >> >> class MyTimestampExtractor extends >> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable { >> override def extractTimestamp(e: T2[Int, String], >> prevElementTimestamp: Long) = >> System.currentTimeMillis >> >> override def getCurrentWatermark(): Watermark = >> new Watermark(System.currentTimeMillis) >> } >> >> class Join extends CoGroupFunction[ >> T2[Int, String], T2[Int, String], T2[Int, String] >> ] { >> val log = LoggerFactory.getLogger(classOf[Join]) >> override def coGroup(left: java.lang.Iterable[T2[Int, String]], >> right: java.lang.Iterable[T2[Int, String]], >> out: Collector[T2[Int, String]]) { >> var seq = Seq.empty[T2[Int, String]] >> left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) } >> right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) } >> seq.foreach { e => out.collect(e) } >> } >> >> } > >