Hey JING,

Thanks a lot for replying to the thread!

Yeah, we are looking at `PreserveWatermarks`. But the issue is that the
datastream.assignTimestampsAndWatermarks() takes `WatermarkStrategy`(from
org.apache.flink.api.common.eventtime) and there is no default method to
define preserveWaterMark Stratergy there, though there are some other
methods for defining other strategies there like `forBoundedOutOfOrderness`.

I want to know if the preservation of watermarks happens there by default.
Since in the newer APIs source-level watermark definitions are deprecated.



On Mon, Oct 25, 2021 at 2:17 PM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi,
> I'm not sure I understand your requirement.
> However, are you looking for `PreserveWatermarks` in package
> `org.apache.flink.table.sources.wmstrategies`?
>
> Best,
> JING ZHANG
>
>
> Arujit Pradhan <arujit.prad...@gojek.com> 于2021年10月25日周一 下午4:02写道:
>
>> Hi all,
>>
>>
>> We maintain an Open-sourced project for protobuf data processing using
>> Flink dagger <http://github.com/odpf/dagger>. But we are currently on
>> Flink-1.9 and want to migrate to the latest stable 1.14.
>>
>>
>> In the older version, we use `*StreamTableSource` *and `
>> *DefinedRowtimeAttributes` *APIs for Table-source definition, similar to
>> this
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/streaming/time_attributes.html#using-a-tablesource-1>.
>> But since these APIs are deprecated we are now defining via
>> *APIExpressions*.
>>
>>
>> The issue for us is while defining WatermarkStrategy, more specifically
>> for the `*PreserveWatermarks*` strategy. We can not find an alternative
>> to this, though other WatermarkStrategies like `
>> *BoundedOutOfOrderTimestamps`*could be found in the newer API definition
>> in `*org.apache.flink.api.common.eventtime.WatermarkStrategy*` package.
>>
>>
>> Currently, we have logic something like in *DefinedRowtimeAttributes* :
>>
>>
>>
>> @Override
>>
>> public *List<RowtimeAttributeDescriptor> *getRowtimeAttributeDescriptors*()
>> {*
>>
>> *    WatermarkStrategy *ws =
>>
>>             enablePerPartitionWatermark ? new PreserveWatermarks*() *:
>> new BoundedOutOfOrderTimestamps*(*watermarkDelay*)*;
>>
>>     return *Collections*.*singletonList**(*
>>
>>             new RowtimeAttributeDescriptor*(*rowTimeAttributeName, new
>> ExistingField*(*rowTimeAttributeName*)*, ws*))*;
>>
>> *}*
>>
>>
>>
>> We want to use *PreserveWatermarks *in places since while Backfilling
>> historical data using flink we want to use underlying Watermark defined in
>> Kafka Consumer-level instead of Sources as it will prevent us from data
>> drops. Is there any alternate in the new APIs we can use? Or else what can
>> we use to get the desired behaviour.
>>
>>
>>
>> Thanks a lot, in advance!
>>
>

Reply via email to