Changing to env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime),
and removing assignTimestampsAndWatermarks(new MyTimestampExtractor)
get the code executing now.

One more question. I read the java doc[1] it seems watermark is a mark
telling operators that no more elements will arrive. So how do I
determine the value of watermark e.g.
MyTimestampExtractor.getCurrentWatermark()? Is it normal to simply
keep track of the timestamp which is the oldest one? Any concrete
examples I can check (I guess because I do not have streaming concepts
so to have such newbie question)?

Many thanks for kindly reply.

[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/watermark/Watermark.html

On 08/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> Yes Robert is right!
>
> Although it is set only once and not per-operator,
> so it looks closer to ingestion time, which is when an
> operator enters the pipeline.
>
> Setting the timeCharacteristic to ingestion time
> could also be an option, if this is what you want to do.
>
> Kostas
>
>> On Jul 8, 2016, at 11:56 AM, Robert Metzger <rmetz...@apache.org> wrote:
>>
>> One thing I would like to add is that your timestamp extractors are not
>> really extracting the event time from your events. They are just returning
>> the current system time, which effectively means you are falling back to
>> processing time.
>>
>> On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas
>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>> Can it be that when you define the ‘right’ steam, you do not specify a
>> timestamp extractor?
>> This is done the same way you do it for the ‘left’ stream.
>>
>> Kostas
>>
>> > On Jul 8, 2016, at 6:12 AM, David Olsen <davidolsen4...@gmail.com
>> > <mailto:davidolsen4...@gmail.com>> wrote:
>> >
>> > Changing TimeCharacteristic to EventTime the flink still throws that
>> > runtime exception error. Is
>> > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
>> > correct way to set that feature?
>> >
>> > Thanks.
>> >
>> > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>> > timestamp marker). Is the time characteristic set to 'ProcessingTime',
>> > or did you forget to call
>> > 'DataStream.assignTimestampsAndWatermarks(...)'?
>> >       at
>> > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
>> >       at
>> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
>> >       at
>> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>> >       at
>> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> >       at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >       at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com
>> > <mailto:k.klou...@data-artisans.com>> wrote:
>> >> Hi David,
>> >>
>> >> You are using Tumbling event time windows, but you set the
>> >> timeCharacteristic to processing time.
>> >> If you want processing time, then you should use
>> >> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> >> If you want event time, then you need to set the timeCharacteristic to
>> >> eventTime and leave the rest of your code as is.
>> >>
>> >> Let me know if this answered your question.
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com
>> >>> <mailto:davidolsen4...@gmail.com>> wrote:
>> >>>
>> >>> I have two streams. One will produce a single record, and the other
>> >>> have a list of records. And I want to do left join. So for example,
>> >>>
>> >>> Stream A:
>> >>> record1
>> >>> record2
>> >>> ...
>> >>>
>> >>> Stream B:
>> >>> single-record
>> >>>
>> >>> After joined,
>> >>>
>> >>> record1, single-record
>> >>> record2, single-record
>> >>> ...
>> >>>
>> >>> However with the following streaming job, it throws an exception
>> >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...'
>> >>> even
>> >>> setStreamTimeCharacteristic is configured to ProcessingTime and
>> >>> assignTimestampsAndWatermarks is called.
>> >>>
>> >>> How can I fix this runtime exception?
>> >>>
>> >>> Thanks.
>> >>>
>> >>> object App {
>> >>> def main(args: Array[String]) {
>> >>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> >>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>> >>>     new MapFunction[Int, T2[Int, String]] {
>> >>>       override def map(value: Int): T2[Int, String] =
>> >>>         new T2[Int, String](value, "data 1")
>> >>>     }
>> >>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>> >>>
>> >>>   val right = env.fromElements(99).map(
>> >>>     new MapFunction[Int, T2[Int, String]] {
>> >>>       override def map(value: Int): T2[Int, String] =
>> >>>         new T2[Int, String](value, "data 2")
>> >>>     }
>> >>>   )
>> >>>   left.coGroup(right).
>> >>>        where { t2 => t2.f0 }.
>> >>>        equalTo{ t2=> t2.f0 }.
>> >>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>> >>>        apply(new Join()).print
>> >>>   env.execute
>> >>> }
>> >>> }
>> >>>
>> >>> class MyTimestampExtractor extends
>> >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>> >>> override def extractTimestamp(e: T2[Int, String],
>> >>>                               prevElementTimestamp: Long) =
>> >>>   System.currentTimeMillis
>> >>>
>> >>> override def getCurrentWatermark(): Watermark =
>> >>>   new Watermark(System.currentTimeMillis)
>> >>> }
>> >>>
>> >>> class Join extends CoGroupFunction[
>> >>> T2[Int, String], T2[Int, String], T2[Int, String]
>> >>> ] {
>> >>> val log = LoggerFactory.getLogger(classOf[Join])
>> >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>> >>>                      right: java.lang.Iterable[T2[Int, String]],
>> >>>                      out: Collector[T2[Int, String]]) {
>> >>>   var seq = Seq.empty[T2[Int, String]]
>> >>>   left.foreach { e => log.info <http://log.info/>(s"from left: $e");
>> >>> seq ++= Seq(e) }
>> >>>   right.foreach { e => log.info <http://log.info/>(s"from right: $e");
>> >>> seq ++= Seq(e) }
>> >>>   seq.foreach { e => out.collect(e) }
>> >>> }
>> >>>
>> >>> }
>> >>
>> >>
>>
>>
>
>

Reply via email to