Hi David!

Have you had a look at the docs for Event Time and Watermark Generation?

There are some examples for some typical cases:

Event Time / Watermark Overview:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html

Typical Watermark Generators:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamp_extractors.html

Writing you own Watermark Generator:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html


Greetings,
Stephan



On Sun, Jul 10, 2016 at 9:43 AM, David Olsen <davidolsen4...@gmail.com>
wrote:

> Changing to
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime),
> and removing assignTimestampsAndWatermarks(new MyTimestampExtractor)
> get the code executing now.
>
> One more question. I read the java doc[1] it seems watermark is a mark
> telling operators that no more elements will arrive. So how do I
> determine the value of watermark e.g.
> MyTimestampExtractor.getCurrentWatermark()? Is it normal to simply
> keep track of the timestamp which is the oldest one? Any concrete
> examples I can check (I guess because I do not have streaming concepts
> so to have such newbie question)?
>
> Many thanks for kindly reply.
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/watermark/Watermark.html
>
> On 08/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> > Yes Robert is right!
> >
> > Although it is set only once and not per-operator,
> > so it looks closer to ingestion time, which is when an
> > operator enters the pipeline.
> >
> > Setting the timeCharacteristic to ingestion time
> > could also be an option, if this is what you want to do.
> >
> > Kostas
> >
> >> On Jul 8, 2016, at 11:56 AM, Robert Metzger <rmetz...@apache.org>
> wrote:
> >>
> >> One thing I would like to add is that your timestamp extractors are not
> >> really extracting the event time from your events. They are just
> returning
> >> the current system time, which effectively means you are falling back to
> >> processing time.
> >>
> >> On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas
> >> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>>
> wrote:
> >> Can it be that when you define the ‘right’ steam, you do not specify a
> >> timestamp extractor?
> >> This is done the same way you do it for the ‘left’ stream.
> >>
> >> Kostas
> >>
> >> > On Jul 8, 2016, at 6:12 AM, David Olsen <davidolsen4...@gmail.com
> >> > <mailto:davidolsen4...@gmail.com>> wrote:
> >> >
> >> > Changing TimeCharacteristic to EventTime the flink still throws that
> >> > runtime exception error. Is
> >> > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> >> > correct way to set that feature?
> >> >
> >> > Thanks.
> >> >
> >> > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> >> > timestamp marker). Is the time characteristic set to 'ProcessingTime',
> >> > or did you forget to call
> >> > 'DataStream.assignTimestampsAndWatermarks(...)'?
> >> >       at
> >> >
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
> >> >       at
> >> >
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
> >> >       at
> >> >
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
> >> >       at
> >> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> >> >       at
> >> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> >> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> >       at java.lang.Thread.run(Thread.java:745)
> >> >
> >> >
> >> > On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com
> >> > <mailto:k.klou...@data-artisans.com>> wrote:
> >> >> Hi David,
> >> >>
> >> >> You are using Tumbling event time windows, but you set the
> >> >> timeCharacteristic to processing time.
> >> >> If you want processing time, then you should use
> >> >> TumblingProcessingTimeWindows and remove the timestampAssigner.
> >> >> If you want event time, then you need to set the timeCharacteristic
> to
> >> >> eventTime and leave the rest of your code as is.
> >> >>
> >> >> Let me know if this answered your question.
> >> >>
> >> >> Cheers,
> >> >> Kostas
> >> >>
> >> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com
> >> >>> <mailto:davidolsen4...@gmail.com>> wrote:
> >> >>>
> >> >>> I have two streams. One will produce a single record, and the other
> >> >>> have a list of records. And I want to do left join. So for example,
> >> >>>
> >> >>> Stream A:
> >> >>> record1
> >> >>> record2
> >> >>> ...
> >> >>>
> >> >>> Stream B:
> >> >>> single-record
> >> >>>
> >> >>> After joined,
> >> >>>
> >> >>> record1, single-record
> >> >>> record2, single-record
> >> >>> ...
> >> >>>
> >> >>> However with the following streaming job, it throws an exception
> >> >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...'
> >> >>> even
> >> >>> setStreamTimeCharacteristic is configured to ProcessingTime and
> >> >>> assignTimestampsAndWatermarks is called.
> >> >>>
> >> >>> How can I fix this runtime exception?
> >> >>>
> >> >>> Thanks.
> >> >>>
> >> >>> object App {
> >> >>> def main(args: Array[String]) {
> >> >>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
> >> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> >> >>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
> >> >>>     new MapFunction[Int, T2[Int, String]] {
> >> >>>       override def map(value: Int): T2[Int, String] =
> >> >>>         new T2[Int, String](value, "data 1")
> >> >>>     }
> >> >>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
> >> >>>
> >> >>>   val right = env.fromElements(99).map(
> >> >>>     new MapFunction[Int, T2[Int, String]] {
> >> >>>       override def map(value: Int): T2[Int, String] =
> >> >>>         new T2[Int, String](value, "data 2")
> >> >>>     }
> >> >>>   )
> >> >>>   left.coGroup(right).
> >> >>>        where { t2 => t2.f0 }.
> >> >>>        equalTo{ t2=> t2.f0 }.
> >> >>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
> >> >>>        apply(new Join()).print
> >> >>>   env.execute
> >> >>> }
> >> >>> }
> >> >>>
> >> >>> class MyTimestampExtractor extends
> >> >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
> >> >>> override def extractTimestamp(e: T2[Int, String],
> >> >>>                               prevElementTimestamp: Long) =
> >> >>>   System.currentTimeMillis
> >> >>>
> >> >>> override def getCurrentWatermark(): Watermark =
> >> >>>   new Watermark(System.currentTimeMillis)
> >> >>> }
> >> >>>
> >> >>> class Join extends CoGroupFunction[
> >> >>> T2[Int, String], T2[Int, String], T2[Int, String]
> >> >>> ] {
> >> >>> val log = LoggerFactory.getLogger(classOf[Join])
> >> >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
> >> >>>                      right: java.lang.Iterable[T2[Int, String]],
> >> >>>                      out: Collector[T2[Int, String]]) {
> >> >>>   var seq = Seq.empty[T2[Int, String]]
> >> >>>   left.foreach { e => log.info <http://log.info/>(s"from left:
> $e");
> >> >>> seq ++= Seq(e) }
> >> >>>   right.foreach { e => log.info <http://log.info/>(s"from right:
> $e");
> >> >>> seq ++= Seq(e) }
> >> >>>   seq.foreach { e => out.collect(e) }
> >> >>> }
> >> >>>
> >> >>> }
> >> >>
> >> >>
> >>
> >>
> >
> >
>

Reply via email to