Hey JING, Thanks a lot for replying to the thread!
Yeah, we are looking at `PreserveWatermarks`. But the issue is that the datastream.assignTimestampsAndWatermarks() takes `WatermarkStrategy`(from org.apache.flink.api.common.eventtime) and there is no default method to define preserveWaterMark Stratergy there, though there are some other methods for defining other strategies there like `forBoundedOutOfOrderness`. I want to know if the preservation of watermarks happens there by default. Since in the newer APIs source-level watermark definitions are deprecated. On Mon, Oct 25, 2021 at 2:17 PM JING ZHANG <beyond1...@gmail.com> wrote: > Hi, > I'm not sure I understand your requirement. > However, are you looking for `PreserveWatermarks` in package > `org.apache.flink.table.sources.wmstrategies`? > > Best, > JING ZHANG > > > Arujit Pradhan <arujit.prad...@gojek.com> 于2021年10月25日周一 下午4:02写道: > >> Hi all, >> >> >> We maintain an Open-sourced project for protobuf data processing using >> Flink dagger <http://github.com/odpf/dagger>. But we are currently on >> Flink-1.9 and want to migrate to the latest stable 1.14. >> >> >> In the older version, we use `*StreamTableSource` *and ` >> *DefinedRowtimeAttributes` *APIs for Table-source definition, similar to >> this >> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/streaming/time_attributes.html#using-a-tablesource-1>. >> But since these APIs are deprecated we are now defining via >> *APIExpressions*. >> >> >> The issue for us is while defining WatermarkStrategy, more specifically >> for the `*PreserveWatermarks*` strategy. We can not find an alternative >> to this, though other WatermarkStrategies like ` >> *BoundedOutOfOrderTimestamps`*could be found in the newer API definition >> in `*org.apache.flink.api.common.eventtime.WatermarkStrategy*` package. >> >> >> Currently, we have logic something like in *DefinedRowtimeAttributes* : >> >> >> >> @Override >> >> public *List<RowtimeAttributeDescriptor> *getRowtimeAttributeDescriptors*() >> {* >> >> * WatermarkStrategy *ws = >> >> enablePerPartitionWatermark ? new PreserveWatermarks*() *: >> new BoundedOutOfOrderTimestamps*(*watermarkDelay*)*; >> >> return *Collections*.*singletonList**(* >> >> new RowtimeAttributeDescriptor*(*rowTimeAttributeName, new >> ExistingField*(*rowTimeAttributeName*)*, ws*))*; >> >> *}* >> >> >> >> We want to use *PreserveWatermarks *in places since while Backfilling >> historical data using flink we want to use underlying Watermark defined in >> Kafka Consumer-level instead of Sources as it will prevent us from data >> drops. Is there any alternate in the new APIs we can use? Or else what can >> we use to get the desired behaviour. >> >> >> >> Thanks a lot, in advance! >> >