Changing to env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), and removing assignTimestampsAndWatermarks(new MyTimestampExtractor) get the code executing now.
One more question. I read the java doc[1] it seems watermark is a mark telling operators that no more elements will arrive. So how do I determine the value of watermark e.g. MyTimestampExtractor.getCurrentWatermark()? Is it normal to simply keep track of the timestamp which is the oldest one? Any concrete examples I can check (I guess because I do not have streaming concepts so to have such newbie question)? Many thanks for kindly reply. [1]. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/watermark/Watermark.html On 08/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Yes Robert is right! > > Although it is set only once and not per-operator, > so it looks closer to ingestion time, which is when an > operator enters the pipeline. > > Setting the timeCharacteristic to ingestion time > could also be an option, if this is what you want to do. > > Kostas > >> On Jul 8, 2016, at 11:56 AM, Robert Metzger <rmetz...@apache.org> wrote: >> >> One thing I would like to add is that your timestamp extractors are not >> really extracting the event time from your events. They are just returning >> the current system time, which effectively means you are falling back to >> processing time. >> >> On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas >> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote: >> Can it be that when you define the ‘right’ steam, you do not specify a >> timestamp extractor? >> This is done the same way you do it for the ‘left’ stream. >> >> Kostas >> >> > On Jul 8, 2016, at 6:12 AM, David Olsen <davidolsen4...@gmail.com >> > <mailto:davidolsen4...@gmail.com>> wrote: >> > >> > Changing TimeCharacteristic to EventTime the flink still throws that >> > runtime exception error. Is >> > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the >> > correct way to set that feature? >> > >> > Thanks. >> > >> > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no >> > timestamp marker). Is the time characteristic set to 'ProcessingTime', >> > or did you forget to call >> > 'DataStream.assignTimestampsAndWatermarks(...)'? >> > at >> > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63) >> > at >> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223) >> > at >> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) >> > at >> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >> > at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > >> > On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com >> > <mailto:k.klou...@data-artisans.com>> wrote: >> >> Hi David, >> >> >> >> You are using Tumbling event time windows, but you set the >> >> timeCharacteristic to processing time. >> >> If you want processing time, then you should use >> >> TumblingProcessingTimeWindows and remove the timestampAssigner. >> >> If you want event time, then you need to set the timeCharacteristic to >> >> eventTime and leave the rest of your code as is. >> >> >> >> Let me know if this answered your question. >> >> >> >> Cheers, >> >> Kostas >> >> >> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com >> >>> <mailto:davidolsen4...@gmail.com>> wrote: >> >>> >> >>> I have two streams. One will produce a single record, and the other >> >>> have a list of records. And I want to do left join. So for example, >> >>> >> >>> Stream A: >> >>> record1 >> >>> record2 >> >>> ... >> >>> >> >>> Stream B: >> >>> single-record >> >>> >> >>> After joined, >> >>> >> >>> record1, single-record >> >>> record2, single-record >> >>> ... >> >>> >> >>> However with the following streaming job, it throws an exception >> >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' >> >>> even >> >>> setStreamTimeCharacteristic is configured to ProcessingTime and >> >>> assignTimestampsAndWatermarks is called. >> >>> >> >>> How can I fix this runtime exception? >> >>> >> >>> Thanks. >> >>> >> >>> object App { >> >>> def main(args: Array[String]) { >> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >> >>> val left = env.fromElements(1, 2, 3, 4, 5).map( >> >>> new MapFunction[Int, T2[Int, String]] { >> >>> override def map(value: Int): T2[Int, String] = >> >>> new T2[Int, String](value, "data 1") >> >>> } >> >>> ).assignTimestampsAndWatermarks(new MyTimestampExtractor) >> >>> >> >>> val right = env.fromElements(99).map( >> >>> new MapFunction[Int, T2[Int, String]] { >> >>> override def map(value: Int): T2[Int, String] = >> >>> new T2[Int, String](value, "data 2") >> >>> } >> >>> ) >> >>> left.coGroup(right). >> >>> where { t2 => t2.f0 }. >> >>> equalTo{ t2=> t2.f0 }. >> >>> window(TumblingEventTimeWindows.of(Time.seconds(1))). >> >>> apply(new Join()).print >> >>> env.execute >> >>> } >> >>> } >> >>> >> >>> class MyTimestampExtractor extends >> >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable { >> >>> override def extractTimestamp(e: T2[Int, String], >> >>> prevElementTimestamp: Long) = >> >>> System.currentTimeMillis >> >>> >> >>> override def getCurrentWatermark(): Watermark = >> >>> new Watermark(System.currentTimeMillis) >> >>> } >> >>> >> >>> class Join extends CoGroupFunction[ >> >>> T2[Int, String], T2[Int, String], T2[Int, String] >> >>> ] { >> >>> val log = LoggerFactory.getLogger(classOf[Join]) >> >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]], >> >>> right: java.lang.Iterable[T2[Int, String]], >> >>> out: Collector[T2[Int, String]]) { >> >>> var seq = Seq.empty[T2[Int, String]] >> >>> left.foreach { e => log.info <http://log.info/>(s"from left: $e"); >> >>> seq ++= Seq(e) } >> >>> right.foreach { e => log.info <http://log.info/>(s"from right: $e"); >> >>> seq ++= Seq(e) } >> >>> seq.foreach { e => out.collect(e) } >> >>> } >> >>> >> >>> } >> >> >> >> >> >> > >