Thank you for the clarification! After some discussion, I think we'll be
using processing time as an alternative for our use case.

Just for my education, if I really need ingestion-time. It seem like I can
get it by either of the below approach?

// 1. an ingestion time watermark strategy

new WatermarkGenerator[Long] {
  var currentMaxTimestamp = 0L
  override def onEvent(event: Long, eventTimestamp: Long, output:
WatermarkOutput): Unit =
    currentMaxTimestamp = System.currentTimeMillis().max(currentMaxTimestamp)

  override def onPeriodicEmit(output: WatermarkOutput): Unit =
    output.emitWatermark(new Watermark(currentMaxTimestamp))
}

// 2. event-time with ingestion time assigner
WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new
IngestionTimeAssigner)


On Tue, Apr 5, 2022 at 4:39 AM Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
>
> IngestionTime is usually used when the records don't have a proper event
> time associated with it, but the job has a long topology, and users want to
> persist the (time)order of events as they arrive in the system.
> In that sense, you can use the regular event time watermark strategies
> also for ingestion time.
> Afaik ingestion time is rarely used in practice.
>
> On Tue, Apr 5, 2022 at 12:10 AM Xinbin Huang <b...@apache.org> wrote:
>
>> Hi,
>>
>> Since *TimeCharacteristic,* is deprecated.
>>
>> AFAIK,
>> - TimeCharacteristic*.*ProcessingTime -> WatermarkStrategy.noWatermarks()
>> - TimeCharacteristic*.*EventTime ->
>> WatermarkStrategy.forBoundedOutOfOrderness(<Duration>)
>> - TimeCharacteristic*.*IngestionTime -> ???
>>
>> Do we have a built-in *WatermarkStrategy *equivalent for the purpose?
>>
>> Thanks
>> Bin
>>
>>

Reply via email to