Changing TimeCharacteristic to EventTime the flink still throws that
runtime exception error. Is
`env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
correct way to set that feature?

Thanks.

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime',
or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?
        at 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> Hi David,
>
> You are using Tumbling event time windows, but you set the
> timeCharacteristic to processing time.
> If you want processing time, then you should use
> TumblingProcessingTimeWindows and remove the timestampAssigner.
> If you want event time, then you need to set the timeCharacteristic to
> eventTime and leave the rest of your code as is.
>
> Let me know if this answered your question.
>
> Cheers,
> Kostas
>
>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com> wrote:
>>
>> I have two streams. One will produce a single record, and the other
>> have a list of records. And I want to do left join. So for example,
>>
>> Stream A:
>> record1
>> record2
>> ...
>>
>> Stream B:
>> single-record
>>
>> After joined,
>>
>> record1, single-record
>> record2, single-record
>> ...
>>
>> However with the following streaming job, it throws an exception
>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>> setStreamTimeCharacteristic is configured to ProcessingTime and
>> assignTimestampsAndWatermarks is called.
>>
>> How can I fix this runtime exception?
>>
>> Thanks.
>>
>> object App {
>>  def main(args: Array[String]) {
>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>    val left = env.fromElements(1, 2, 3, 4, 5).map(
>>      new MapFunction[Int, T2[Int, String]] {
>>        override def map(value: Int): T2[Int, String] =
>>          new T2[Int, String](value, "data 1")
>>      }
>>    ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>
>>    val right = env.fromElements(99).map(
>>      new MapFunction[Int, T2[Int, String]] {
>>        override def map(value: Int): T2[Int, String] =
>>          new T2[Int, String](value, "data 2")
>>      }
>>    )
>>    left.coGroup(right).
>>         where { t2 => t2.f0 }.
>>         equalTo{ t2=> t2.f0 }.
>>         window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>         apply(new Join()).print
>>    env.execute
>>  }
>> }
>>
>> class MyTimestampExtractor extends
>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>  override def extractTimestamp(e: T2[Int, String],
>>                                prevElementTimestamp: Long) =
>>    System.currentTimeMillis
>>
>>  override def getCurrentWatermark(): Watermark =
>>    new Watermark(System.currentTimeMillis)
>> }
>>
>> class Join extends CoGroupFunction[
>>  T2[Int, String], T2[Int, String], T2[Int, String]
>> ] {
>>  val log = LoggerFactory.getLogger(classOf[Join])
>>  override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>                       right: java.lang.Iterable[T2[Int, String]],
>>                       out: Collector[T2[Int, String]]) {
>>    var seq = Seq.empty[T2[Int, String]]
>>    left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>    right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>    seq.foreach { e => out.collect(e) }
>>  }
>>
>> }
>
>

Reply via email to