What is the best way to run unit tests on streams that contain
ProcessTimeWindows?

Example:

def bufferDataStreamByProcessId(ds: DataStream[MaalkaRecord]):
DataStream[MaalkaRecord] = {
  ds.map { r =>
    println(s"data in: $r") // Data shows up here
    r
  }.keyBy { mr =>
    val r = mr.asInstanceOf[MaalkaDataRecord]
    r.dataProcessingId -> r.isBefore
  }
    .window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(10)))
    .reduce { (_, r2) =>
      println(s"Reducing: r2") // Data does NOT show up here
      r2
    }
    .map { r =>
      println(s"Emitted: $r") // Data does NOT show up here
      r
    }
}


This stream completes about 100ms after the first element is received by
the ProcessingTimeSessionWindow but no data is emitted from the
sessionWindow.

If i change the window to use a
TumblingProcessTimeWindow.of(Time.milliseconds(1)) some of windows do emit,
but the many of the expected values are missing.

Any ideas?

Cheers,
Clay

Reply via email to