One thing I would like to add is that your timestamp extractors are not
really extracting the event time from your events. They are just returning
the current system time, which effectively means you are falling back to
processing time.

On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Can it be that when you define the ‘right’ steam, you do not specify a
> timestamp extractor?
> This is done the same way you do it for the ‘left’ stream.
>
> Kostas
>
> > On Jul 8, 2016, at 6:12 AM, David Olsen <davidolsen4...@gmail.com>
> wrote:
> >
> > Changing TimeCharacteristic to EventTime the flink still throws that
> > runtime exception error. Is
> > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> > correct way to set that feature?
> >
> > Thanks.
> >
> > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> > timestamp marker). Is the time characteristic set to 'ProcessingTime',
> > or did you forget to call
> > 'DataStream.assignTimestampsAndWatermarks(...)'?
> >       at
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
> >       at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
> >       at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
> >       at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >       at java.lang.Thread.run(Thread.java:745)
> >
> >
> > On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> >> Hi David,
> >>
> >> You are using Tumbling event time windows, but you set the
> >> timeCharacteristic to processing time.
> >> If you want processing time, then you should use
> >> TumblingProcessingTimeWindows and remove the timestampAssigner.
> >> If you want event time, then you need to set the timeCharacteristic to
> >> eventTime and leave the rest of your code as is.
> >>
> >> Let me know if this answered your question.
> >>
> >> Cheers,
> >> Kostas
> >>
> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com>
> wrote:
> >>>
> >>> I have two streams. One will produce a single record, and the other
> >>> have a list of records. And I want to do left join. So for example,
> >>>
> >>> Stream A:
> >>> record1
> >>> record2
> >>> ...
> >>>
> >>> Stream B:
> >>> single-record
> >>>
> >>> After joined,
> >>>
> >>> record1, single-record
> >>> record2, single-record
> >>> ...
> >>>
> >>> However with the following streaming job, it throws an exception
> >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
> >>> setStreamTimeCharacteristic is configured to ProcessingTime and
> >>> assignTimestampsAndWatermarks is called.
> >>>
> >>> How can I fix this runtime exception?
> >>>
> >>> Thanks.
> >>>
> >>> object App {
> >>> def main(args: Array[String]) {
> >>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> >>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
> >>>     new MapFunction[Int, T2[Int, String]] {
> >>>       override def map(value: Int): T2[Int, String] =
> >>>         new T2[Int, String](value, "data 1")
> >>>     }
> >>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
> >>>
> >>>   val right = env.fromElements(99).map(
> >>>     new MapFunction[Int, T2[Int, String]] {
> >>>       override def map(value: Int): T2[Int, String] =
> >>>         new T2[Int, String](value, "data 2")
> >>>     }
> >>>   )
> >>>   left.coGroup(right).
> >>>        where { t2 => t2.f0 }.
> >>>        equalTo{ t2=> t2.f0 }.
> >>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
> >>>        apply(new Join()).print
> >>>   env.execute
> >>> }
> >>> }
> >>>
> >>> class MyTimestampExtractor extends
> >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
> >>> override def extractTimestamp(e: T2[Int, String],
> >>>                               prevElementTimestamp: Long) =
> >>>   System.currentTimeMillis
> >>>
> >>> override def getCurrentWatermark(): Watermark =
> >>>   new Watermark(System.currentTimeMillis)
> >>> }
> >>>
> >>> class Join extends CoGroupFunction[
> >>> T2[Int, String], T2[Int, String], T2[Int, String]
> >>> ] {
> >>> val log = LoggerFactory.getLogger(classOf[Join])
> >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
> >>>                      right: java.lang.Iterable[T2[Int, String]],
> >>>                      out: Collector[T2[Int, String]]) {
> >>>   var seq = Seq.empty[T2[Int, String]]
> >>>   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
> >>>   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
> >>>   seq.foreach { e => out.collect(e) }
> >>> }
> >>>
> >>> }
> >>
> >>
>
>

Reply via email to