I have two streams. One will produce a single record, and the other
have a list of records. And I want to do left join. So for example,

Stream A:
record1
record2
...

Stream B:
single-record

After joined,

record1, single-record
record2, single-record
...

However with the following streaming job, it throws an exception
'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
setStreamTimeCharacteristic is configured to ProcessingTime and
assignTimestampsAndWatermarks is called.

How can I fix this runtime exception?

Thanks.

object App {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val left = env.fromElements(1, 2, 3, 4, 5).map(
      new MapFunction[Int, T2[Int, String]] {
        override def map(value: Int): T2[Int, String] =
          new T2[Int, String](value, "data 1")
      }
    ).assignTimestampsAndWatermarks(new MyTimestampExtractor)

    val right = env.fromElements(99).map(
      new MapFunction[Int, T2[Int, String]] {
        override def map(value: Int): T2[Int, String] =
          new T2[Int, String](value, "data 2")
      }
    )
    left.coGroup(right).
         where { t2 => t2.f0 }.
         equalTo{ t2=> t2.f0 }.
         window(TumblingEventTimeWindows.of(Time.seconds(1))).
         apply(new Join()).print
    env.execute
  }
}

class MyTimestampExtractor extends
AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
  override def extractTimestamp(e: T2[Int, String],
                                prevElementTimestamp: Long) =
    System.currentTimeMillis

  override def getCurrentWatermark(): Watermark =
    new Watermark(System.currentTimeMillis)
}

class Join extends CoGroupFunction[
  T2[Int, String], T2[Int, String], T2[Int, String]
] {
  val log = LoggerFactory.getLogger(classOf[Join])
  override def coGroup(left: java.lang.Iterable[T2[Int, String]],
                       right: java.lang.Iterable[T2[Int, String]],
                       out: Collector[T2[Int, String]]) {
    var seq = Seq.empty[T2[Int, String]]
    left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
    right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
    seq.foreach { e => out.collect(e) }
  }

}

Reply via email to