Can it be that when you define the ‘right’ steam, you do not specify a 
timestamp extractor?
This is done the same way you do it for the ‘left’ stream.

Kostas

> On Jul 8, 2016, at 6:12 AM, David Olsen <davidolsen4...@gmail.com> wrote:
> 
> Changing TimeCharacteristic to EventTime the flink still throws that
> runtime exception error. Is
> `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> correct way to set that feature?
> 
> Thanks.
> 
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime',
> or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>       at 
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> 
> 
> On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
>> Hi David,
>> 
>> You are using Tumbling event time windows, but you set the
>> timeCharacteristic to processing time.
>> If you want processing time, then you should use
>> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> If you want event time, then you need to set the timeCharacteristic to
>> eventTime and leave the rest of your code as is.
>> 
>> Let me know if this answered your question.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com> wrote:
>>> 
>>> I have two streams. One will produce a single record, and the other
>>> have a list of records. And I want to do left join. So for example,
>>> 
>>> Stream A:
>>> record1
>>> record2
>>> ...
>>> 
>>> Stream B:
>>> single-record
>>> 
>>> After joined,
>>> 
>>> record1, single-record
>>> record2, single-record
>>> ...
>>> 
>>> However with the following streaming job, it throws an exception
>>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>>> setStreamTimeCharacteristic is configured to ProcessingTime and
>>> assignTimestampsAndWatermarks is called.
>>> 
>>> How can I fix this runtime exception?
>>> 
>>> Thanks.
>>> 
>>> object App {
>>> def main(args: Array[String]) {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 1")
>>>     }
>>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>> 
>>>   val right = env.fromElements(99).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 2")
>>>     }
>>>   )
>>>   left.coGroup(right).
>>>        where { t2 => t2.f0 }.
>>>        equalTo{ t2=> t2.f0 }.
>>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>>        apply(new Join()).print
>>>   env.execute
>>> }
>>> }
>>> 
>>> class MyTimestampExtractor extends
>>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>> override def extractTimestamp(e: T2[Int, String],
>>>                               prevElementTimestamp: Long) =
>>>   System.currentTimeMillis
>>> 
>>> override def getCurrentWatermark(): Watermark =
>>>   new Watermark(System.currentTimeMillis)
>>> }
>>> 
>>> class Join extends CoGroupFunction[
>>> T2[Int, String], T2[Int, String], T2[Int, String]
>>> ] {
>>> val log = LoggerFactory.getLogger(classOf[Join])
>>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>>                      right: java.lang.Iterable[T2[Int, String]],
>>>                      out: Collector[T2[Int, String]]) {
>>>   var seq = Seq.empty[T2[Int, String]]
>>>   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>>   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>>   seq.foreach { e => out.collect(e) }
>>> }
>>> 
>>> }
>> 
>> 

Reply via email to