Hi Arujit,
I got it, sorry for misunderstanding your requirements before.
Yes, if you already specify watermark strategy on the source (please see
more in document [1]), you don't need to call `DataStream.
assignTimestampsAndWatermarks`  after source anymore. The watermark would
be kept by default.
If the source does not has watermark, you could call `DataStream.
assignTimestampsAndWatermarks` to generate watermark. Please note, in this
case, if the original source already has watermark, it would be overwritten
by the later one.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector

Arujit Pradhan <arujit.prad...@gojek.com> 于2021年10月25日周一 下午5:36写道:

> Hey JING,
>
> Thanks a lot for replying to the thread!
>
> Yeah, we are looking at `PreserveWatermarks`. But the issue is that the
> datastream.assignTimestampsAndWatermarks() takes `WatermarkStrategy`(from
> org.apache.flink.api.common.eventtime) and there is no default method to
> define preserveWaterMark Stratergy there, though there are some other
> methods for defining other strategies there like `forBoundedOutOfOrderness`.
>
> I want to know if the preservation of watermarks happens there by default.
> Since in the newer APIs source-level watermark definitions are deprecated.
>
>
>
> On Mon, Oct 25, 2021 at 2:17 PM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi,
>> I'm not sure I understand your requirement.
>> However, are you looking for `PreserveWatermarks` in package
>> `org.apache.flink.table.sources.wmstrategies`?
>>
>> Best,
>> JING ZHANG
>>
>>
>> Arujit Pradhan <arujit.prad...@gojek.com> 于2021年10月25日周一 下午4:02写道:
>>
>>> Hi all,
>>>
>>>
>>> We maintain an Open-sourced project for protobuf data processing using
>>> Flink dagger <http://github.com/odpf/dagger>. But we are currently on
>>> Flink-1.9 and want to migrate to the latest stable 1.14.
>>>
>>>
>>> In the older version, we use `*StreamTableSource` *and `
>>> *DefinedRowtimeAttributes` *APIs for Table-source definition, similar
>>> to this
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/streaming/time_attributes.html#using-a-tablesource-1>.
>>> But since these APIs are deprecated we are now defining via
>>> *APIExpressions*.
>>>
>>>
>>> The issue for us is while defining WatermarkStrategy, more specifically
>>> for the `*PreserveWatermarks*` strategy. We can not find an alternative
>>> to this, though other WatermarkStrategies like `
>>> *BoundedOutOfOrderTimestamps`*could be found in the newer API
>>> definition in `*org.apache.flink.api.common.eventtime.WatermarkStrategy*`
>>> package.
>>>
>>>
>>> Currently, we have logic something like in *DefinedRowtimeAttributes* :
>>>
>>>
>>>
>>> @Override
>>>
>>> public *List<RowtimeAttributeDescriptor> *getRowtimeAttributeDescriptors*()
>>> {*
>>>
>>> *    WatermarkStrategy *ws =
>>>
>>>             enablePerPartitionWatermark ? new PreserveWatermarks*() *:
>>> new BoundedOutOfOrderTimestamps*(*watermarkDelay*)*;
>>>
>>>     return *Collections*.*singletonList**(*
>>>
>>>             new RowtimeAttributeDescriptor*(*rowTimeAttributeName, new
>>> ExistingField*(*rowTimeAttributeName*)*, ws*))*;
>>>
>>> *}*
>>>
>>>
>>>
>>> We want to use *PreserveWatermarks *in places since while Backfilling
>>> historical data using flink we want to use underlying Watermark defined in
>>> Kafka Consumer-level instead of Sources as it will prevent us from data
>>> drops. Is there any alternate in the new APIs we can use? Or else what can
>>> we use to get the desired behaviour.
>>>
>>>
>>>
>>> Thanks a lot, in advance!
>>>
>>

Reply via email to