Hi, I'm not sure I understand your requirement. However, are you looking for `PreserveWatermarks` in package `org.apache.flink.table.sources.wmstrategies`?
Best, JING ZHANG Arujit Pradhan <arujit.prad...@gojek.com> 于2021年10月25日周一 下午4:02写道: > Hi all, > > > We maintain an Open-sourced project for protobuf data processing using > Flink dagger <http://github.com/odpf/dagger>. But we are currently on > Flink-1.9 and want to migrate to the latest stable 1.14. > > > In the older version, we use `*StreamTableSource` *and ` > *DefinedRowtimeAttributes` *APIs for Table-source definition, similar to > this > <https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/streaming/time_attributes.html#using-a-tablesource-1>. > But since these APIs are deprecated we are now defining via > *APIExpressions*. > > > The issue for us is while defining WatermarkStrategy, more specifically > for the `*PreserveWatermarks*` strategy. We can not find an alternative > to this, though other WatermarkStrategies like ` > *BoundedOutOfOrderTimestamps`*could be found in the newer API definition > in `*org.apache.flink.api.common.eventtime.WatermarkStrategy*` package. > > > Currently, we have logic something like in *DefinedRowtimeAttributes* : > > > > @Override > > public *List<RowtimeAttributeDescriptor> *getRowtimeAttributeDescriptors*() > {* > > * WatermarkStrategy *ws = > > enablePerPartitionWatermark ? new PreserveWatermarks*() *: > new BoundedOutOfOrderTimestamps*(*watermarkDelay*)*; > > return *Collections*.*singletonList**(* > > new RowtimeAttributeDescriptor*(*rowTimeAttributeName, new > ExistingField*(*rowTimeAttributeName*)*, ws*))*; > > *}* > > > > We want to use *PreserveWatermarks *in places since while Backfilling > historical data using flink we want to use underlying Watermark defined in > Kafka Consumer-level instead of Sources as it will prevent us from data > drops. Is there any alternate in the new APIs we can use? Or else what can > we use to get the desired behaviour. > > > > Thanks a lot, in advance! >