Hi David! Have you had a look at the docs for Event Time and Watermark Generation?
There are some examples for some typical cases: Event Time / Watermark Overview: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html Typical Watermark Generators: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamp_extractors.html Writing you own Watermark Generator: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html Greetings, Stephan On Sun, Jul 10, 2016 at 9:43 AM, David Olsen <davidolsen4...@gmail.com> wrote: > Changing to > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), > and removing assignTimestampsAndWatermarks(new MyTimestampExtractor) > get the code executing now. > > One more question. I read the java doc[1] it seems watermark is a mark > telling operators that no more elements will arrive. So how do I > determine the value of watermark e.g. > MyTimestampExtractor.getCurrentWatermark()? Is it normal to simply > keep track of the timestamp which is the oldest one? Any concrete > examples I can check (I guess because I do not have streaming concepts > so to have such newbie question)? > > Many thanks for kindly reply. > > [1]. > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/watermark/Watermark.html > > On 08/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > > Yes Robert is right! > > > > Although it is set only once and not per-operator, > > so it looks closer to ingestion time, which is when an > > operator enters the pipeline. > > > > Setting the timeCharacteristic to ingestion time > > could also be an option, if this is what you want to do. > > > > Kostas > > > >> On Jul 8, 2016, at 11:56 AM, Robert Metzger <rmetz...@apache.org> > wrote: > >> > >> One thing I would like to add is that your timestamp extractors are not > >> really extracting the event time from your events. They are just > returning > >> the current system time, which effectively means you are falling back to > >> processing time. > >> > >> On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas > >> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> > wrote: > >> Can it be that when you define the ‘right’ steam, you do not specify a > >> timestamp extractor? > >> This is done the same way you do it for the ‘left’ stream. > >> > >> Kostas > >> > >> > On Jul 8, 2016, at 6:12 AM, David Olsen <davidolsen4...@gmail.com > >> > <mailto:davidolsen4...@gmail.com>> wrote: > >> > > >> > Changing TimeCharacteristic to EventTime the flink still throws that > >> > runtime exception error. Is > >> > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the > >> > correct way to set that feature? > >> > > >> > Thanks. > >> > > >> > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no > >> > timestamp marker). Is the time characteristic set to 'ProcessingTime', > >> > or did you forget to call > >> > 'DataStream.assignTimestampsAndWatermarks(...)'? > >> > at > >> > > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63) > >> > at > >> > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223) > >> > at > >> > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) > >> > at > >> > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > >> > at > >> > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> > at java.lang.Thread.run(Thread.java:745) > >> > > >> > > >> > On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com > >> > <mailto:k.klou...@data-artisans.com>> wrote: > >> >> Hi David, > >> >> > >> >> You are using Tumbling event time windows, but you set the > >> >> timeCharacteristic to processing time. > >> >> If you want processing time, then you should use > >> >> TumblingProcessingTimeWindows and remove the timestampAssigner. > >> >> If you want event time, then you need to set the timeCharacteristic > to > >> >> eventTime and leave the rest of your code as is. > >> >> > >> >> Let me know if this answered your question. > >> >> > >> >> Cheers, > >> >> Kostas > >> >> > >> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com > >> >>> <mailto:davidolsen4...@gmail.com>> wrote: > >> >>> > >> >>> I have two streams. One will produce a single record, and the other > >> >>> have a list of records. And I want to do left join. So for example, > >> >>> > >> >>> Stream A: > >> >>> record1 > >> >>> record2 > >> >>> ... > >> >>> > >> >>> Stream B: > >> >>> single-record > >> >>> > >> >>> After joined, > >> >>> > >> >>> record1, single-record > >> >>> record2, single-record > >> >>> ... > >> >>> > >> >>> However with the following streaming job, it throws an exception > >> >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' > >> >>> even > >> >>> setStreamTimeCharacteristic is configured to ProcessingTime and > >> >>> assignTimestampsAndWatermarks is called. > >> >>> > >> >>> How can I fix this runtime exception? > >> >>> > >> >>> Thanks. > >> >>> > >> >>> object App { > >> >>> def main(args: Array[String]) { > >> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment > >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > >> >>> val left = env.fromElements(1, 2, 3, 4, 5).map( > >> >>> new MapFunction[Int, T2[Int, String]] { > >> >>> override def map(value: Int): T2[Int, String] = > >> >>> new T2[Int, String](value, "data 1") > >> >>> } > >> >>> ).assignTimestampsAndWatermarks(new MyTimestampExtractor) > >> >>> > >> >>> val right = env.fromElements(99).map( > >> >>> new MapFunction[Int, T2[Int, String]] { > >> >>> override def map(value: Int): T2[Int, String] = > >> >>> new T2[Int, String](value, "data 2") > >> >>> } > >> >>> ) > >> >>> left.coGroup(right). > >> >>> where { t2 => t2.f0 }. > >> >>> equalTo{ t2=> t2.f0 }. > >> >>> window(TumblingEventTimeWindows.of(Time.seconds(1))). > >> >>> apply(new Join()).print > >> >>> env.execute > >> >>> } > >> >>> } > >> >>> > >> >>> class MyTimestampExtractor extends > >> >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable { > >> >>> override def extractTimestamp(e: T2[Int, String], > >> >>> prevElementTimestamp: Long) = > >> >>> System.currentTimeMillis > >> >>> > >> >>> override def getCurrentWatermark(): Watermark = > >> >>> new Watermark(System.currentTimeMillis) > >> >>> } > >> >>> > >> >>> class Join extends CoGroupFunction[ > >> >>> T2[Int, String], T2[Int, String], T2[Int, String] > >> >>> ] { > >> >>> val log = LoggerFactory.getLogger(classOf[Join]) > >> >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]], > >> >>> right: java.lang.Iterable[T2[Int, String]], > >> >>> out: Collector[T2[Int, String]]) { > >> >>> var seq = Seq.empty[T2[Int, String]] > >> >>> left.foreach { e => log.info <http://log.info/>(s"from left: > $e"); > >> >>> seq ++= Seq(e) } > >> >>> right.foreach { e => log.info <http://log.info/>(s"from right: > $e"); > >> >>> seq ++= Seq(e) } > >> >>> seq.foreach { e => out.collect(e) } > >> >>> } > >> >>> > >> >>> } > >> >> > >> >> > >> > >> > > > > >