Hi Arujit, I got it, sorry for misunderstanding your requirements before. Yes, if you already specify watermark strategy on the source (please see more in document [1]), you don't need to call `DataStream. assignTimestampsAndWatermarks` after source anymore. The watermark would be kept by default. If the source does not has watermark, you could call `DataStream. assignTimestampsAndWatermarks` to generate watermark. Please note, in this case, if the original source already has watermark, it would be overwritten by the later one.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector Arujit Pradhan <arujit.prad...@gojek.com> 于2021年10月25日周一 下午5:36写道: > Hey JING, > > Thanks a lot for replying to the thread! > > Yeah, we are looking at `PreserveWatermarks`. But the issue is that the > datastream.assignTimestampsAndWatermarks() takes `WatermarkStrategy`(from > org.apache.flink.api.common.eventtime) and there is no default method to > define preserveWaterMark Stratergy there, though there are some other > methods for defining other strategies there like `forBoundedOutOfOrderness`. > > I want to know if the preservation of watermarks happens there by default. > Since in the newer APIs source-level watermark definitions are deprecated. > > > > On Mon, Oct 25, 2021 at 2:17 PM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi, >> I'm not sure I understand your requirement. >> However, are you looking for `PreserveWatermarks` in package >> `org.apache.flink.table.sources.wmstrategies`? >> >> Best, >> JING ZHANG >> >> >> Arujit Pradhan <arujit.prad...@gojek.com> 于2021年10月25日周一 下午4:02写道: >> >>> Hi all, >>> >>> >>> We maintain an Open-sourced project for protobuf data processing using >>> Flink dagger <http://github.com/odpf/dagger>. But we are currently on >>> Flink-1.9 and want to migrate to the latest stable 1.14. >>> >>> >>> In the older version, we use `*StreamTableSource` *and ` >>> *DefinedRowtimeAttributes` *APIs for Table-source definition, similar >>> to this >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/streaming/time_attributes.html#using-a-tablesource-1>. >>> But since these APIs are deprecated we are now defining via >>> *APIExpressions*. >>> >>> >>> The issue for us is while defining WatermarkStrategy, more specifically >>> for the `*PreserveWatermarks*` strategy. We can not find an alternative >>> to this, though other WatermarkStrategies like ` >>> *BoundedOutOfOrderTimestamps`*could be found in the newer API >>> definition in `*org.apache.flink.api.common.eventtime.WatermarkStrategy*` >>> package. >>> >>> >>> Currently, we have logic something like in *DefinedRowtimeAttributes* : >>> >>> >>> >>> @Override >>> >>> public *List<RowtimeAttributeDescriptor> *getRowtimeAttributeDescriptors*() >>> {* >>> >>> * WatermarkStrategy *ws = >>> >>> enablePerPartitionWatermark ? new PreserveWatermarks*() *: >>> new BoundedOutOfOrderTimestamps*(*watermarkDelay*)*; >>> >>> return *Collections*.*singletonList**(* >>> >>> new RowtimeAttributeDescriptor*(*rowTimeAttributeName, new >>> ExistingField*(*rowTimeAttributeName*)*, ws*))*; >>> >>> *}* >>> >>> >>> >>> We want to use *PreserveWatermarks *in places since while Backfilling >>> historical data using flink we want to use underlying Watermark defined in >>> Kafka Consumer-level instead of Sources as it will prevent us from data >>> drops. Is there any alternate in the new APIs we can use? Or else what can >>> we use to get the desired behaviour. >>> >>> >>> >>> Thanks a lot, in advance! >>> >>