Thank you for the clarification! After some discussion, I think we'll be using processing time as an alternative for our use case.
Just for my education, if I really need ingestion-time. It seem like I can get it by either of the below approach? // 1. an ingestion time watermark strategy new WatermarkGenerator[Long] { var currentMaxTimestamp = 0L override def onEvent(event: Long, eventTimestamp: Long, output: WatermarkOutput): Unit = currentMaxTimestamp = System.currentTimeMillis().max(currentMaxTimestamp) override def onPeriodicEmit(output: WatermarkOutput): Unit = output.emitWatermark(new Watermark(currentMaxTimestamp)) } // 2. event-time with ingestion time assigner WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new IngestionTimeAssigner) On Tue, Apr 5, 2022 at 4:39 AM Robert Metzger <rmetz...@apache.org> wrote: > Hi, > > IngestionTime is usually used when the records don't have a proper event > time associated with it, but the job has a long topology, and users want to > persist the (time)order of events as they arrive in the system. > In that sense, you can use the regular event time watermark strategies > also for ingestion time. > Afaik ingestion time is rarely used in practice. > > On Tue, Apr 5, 2022 at 12:10 AM Xinbin Huang <b...@apache.org> wrote: > >> Hi, >> >> Since *TimeCharacteristic,* is deprecated. >> >> AFAIK, >> - TimeCharacteristic*.*ProcessingTime -> WatermarkStrategy.noWatermarks() >> - TimeCharacteristic*.*EventTime -> >> WatermarkStrategy.forBoundedOutOfOrderness(<Duration>) >> - TimeCharacteristic*.*IngestionTime -> ??? >> >> Do we have a built-in *WatermarkStrategy *equivalent for the purpose? >> >> Thanks >> Bin >> >>