I have similar thoughts to @Stephan

Ad. 1 I tried something like this on your branch:

    /**
     * Adds the given {@link TimestampAssigner} to this {@link
WatermarkStrategies}. For top-level classes that implement both
Serializable and TimestampAssigner
     */
    public <TA extends TimestampAssigner<T> & Serializable>
WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        this.timestampAssigner = timestampAssigner;
        return this;
    }   

   @FunctionalInterface
    public interface SerializableTimestampAssigner<T> extends
TimestampAssigner<T>, Serializable {
    }   

     /**
      * Adds the given {@link TimestampAssigner} to this {@link
WatermarkStrategies}.
     * Helper method for serializable lambdas.
     */
    public WatermarkStrategies<T>
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        this.timestampAssigner = timestampAssigner;
        return this;
    }

But I understand if that's too hacky. It's just a pity that we must
enforce limitations on an interface that are not strictly necessary.

Ad 2/3

I am aware the watermark assigner/timestamp extractor can be applied
further down the graph. Originally I also wanted to suggest
sourceTimestamp and SourceTimestampAssigner, but then I realized it can
be used also after the sources as you correctly pointed out. Even if the
TimestampAssigner is used after the source there might be some
native/record timestamp in the StreamRecord, that could've been
extracted by previous assigner.

Best,

Dawid

On 12/05/2020 16:47, Stephan Ewen wrote:
> @Aljoscha
>
> About (1) could we have an interface SerializableTimestampAssigner that
> simply mixes in the java.io.Serializable interface? Or will this be too
> clumsy?
>
> About (3) RecordTimeStamp seems to fit both cases (in-source-record
> timestamp, in stream-record timestamp).
>
> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points 1) and
>> 3).
>>
>> 1) I can see the benefit of that but in reality most timestamp assigners
>> will probably need to be Serializable. If you look at my (updated) POC
>> branch [1] you can see how a TimestampAssigner would be specified on the
>> WatermarkStrategies helper class: [2]. The signature of this would have
>> to be changed to something like:
>>
>> public <TA extends TimestampAssigner<T> & Serializable>
>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
>>
>> Then, however, it would not be possible for users to specify a lambda or
>> anonymous inner function for the TimestampAssigner like this:
>>
>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
>>                 .forGenerator(new PeriodicTestWatermarkGenerator())
>>                 .withTimestampAssigner((event, timestamp) -> event)
>>                 .build();
>>
>> 3) This makes sense if we only allow WatermarkStrategies on sources,
>> where the previous timestamp really is the "native" timestamp.
>> Currently, we also allow setting watermark strategies at arbitrary
>> points in the graph. I'm thinking we probably should only allow that in
>> sources but it's not the reality currently. I'm not against renaming it,
>> just voicing those thoughts.
>>
>> Best,
>> Aljoscha
>>
>>
>> [1] https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
>> [2]
>>
>> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
>>
>> On 12.05.20 15:48, Stephan Ewen wrote:
>>> +1 to all of Dawid's suggestions, makes a lot of sense to me
>>>
>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <dwysakow...@apache.org
>>>
>>> wrote:
>>>
>>>> Hi Aljoscha,
>>>>
>>>> Sorry for adding comments during the vote, but I have some really minor
>>>> suggestions that should not influence the voting thread imo.
>>>>
>>>> 1) Does it make sense to have the TimestampAssigner extend from Flink's
>>>> Function? This implies it has to be serializable which with the factory
>>>> pattern is not strictly necessary, right? BTW I really like that you
>>>> suggested the FunctionInterface annotation there.
>>>>
>>>> 2) Could we rename the IdentityTimestampAssigner to e.g.
>>>>
>> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
>>>> Personally I found the IdentityTimestampAssigner a bit misleading as it
>>>> usually mean a no-op. Which did not click for me, as I assumed it
>>>> somehow returns the incoming record itself.
>>>>
>>>> 3) Could we rename the second parameter of TimestampAssigner#extract to
>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
>>>> above. This parameter was also a bit confusing for me as I thought at
>>>> times its somehow related to
>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
>>>> whole system currentTimestamp.
>>>>
>>>> Other than those three points I like the proposal and I was about to
>>>> vote +1 if it was not for those three points.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 11/05/2020 16:57, Jark Wu wrote:
>>>>> Thanks for the explanation. I like the fatory pattern to make the
>> member
>>>>> variables immutable and final.
>>>>>
>>>>> So +1 to the proposal.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org> wrote:
>>>>>
>>>>>> I am fine with that.
>>>>>>
>>>>>> Much of the principles seem agreed upon. I understand the need to
>>>> support
>>>>>> code-generated extractors and we should support most of it already (as
>>>>>> Aljoscha mentioned via the factories) can extend this if needed.
>>>>>>
>>>>>> I think that the factory approach supports code-generated extractors
>> in
>>>> a
>>>>>> cleaner way even than an extractor with an open/init method.
>>>>>>
>>>>>>
>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org
>>>>>> wrote:
>>>>>>
>>>>>>> We're slightly running out of time. I would propose we vote on the
>>>> basic
>>>>>>> principle and remain open to later additions. This feature is quite
>>>>>>> important to make the new Kafka Source that is developed as part of
>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces
>> in
>>>>>>> the newly added connector.
>>>>>>>
>>>>>>> I know that's a bit unorthodox but would everyone be OK with what's
>>>>>>> currently there and then we iterate?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
>>>>>>>> Ah, I meant to write this in my previous email, sorry about that.
>>>>>>>>
>>>>>>>> The WatermarkStrategy, which is basically a factory for a
>>>>>>>> WatermarkGenerator is the replacement for the open() method. This is
>>>>>> the
>>>>>>>> same strategy that was followed for StreamOperatorFactory, which was
>>>>>>>> introduced to allow code generation in the Table API [1]. If we need
>>>>>>>> metrics or other things we would add that as a parameter to the
>>>> factory
>>>>>>>> method. What do you think?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
>>>>>>>>
>>>>>>>> On 10.05.20 05:07, Jark Wu wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
>>>>>>>>> Table&SQL to
>>>>>>>>> compile the generated code.
>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
>>>>>>> using
>>>>>>>>> SQL expressions, we will
>>>>>>>>> translate and generate Java code for the expressions. If we have
>>>>>>>>> `open()/close()`, we don't need lazy initialization.
>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
>>>>>> current
>>>>>>>>> watermark, the dirty timestamps (null value), etc.
>>>>>>>>> So I think a simple `open()/close()` with a context which can get
>>>>>>>>> MetricGroup is nice and not complex for the first version.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org>
>> wrote:
>>>>>>>>>> Thanks, Aljoscha, for picking this up.
>>>>>>>>>>
>>>>>>>>>> I agree with the approach of doing the here proposed set of
>> changes
>>>>>> for
>>>>>>>>>> now. It already makes things simpler and adds idleness support
>>>>>>>>>> everywhere.
>>>>>>>>>>
>>>>>>>>>> Rich functions and state always add complexity, let's do this in a
>>>>>> next
>>>>>>>>>> step, if we have a really compelling case.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
>>>>>> aljos...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
>>>> proposal
>>>>>>> is
>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the
>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
>>>>>> can
>>>>>>>>>>> decide whether to output a watermark or not and can also mark the
>>>>>>>>>>> output
>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
>>>>>> previous
>>>>>>>>>>> watermark assigner interface did) would not allow that
>> flexibility.
>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of the
>>>>>> minimum
>>>>>>>>>>> watermark, this would be the responsibility of the framework (or
>>>> the
>>>>>>>>>>> KafkaConsumer in the current implementation). The user-supplied
>> WG
>>>>>>> does
>>>>>>>>>>> not need to make sure the watermark doesn't regress.
>>>>>>>>>>>
>>>>>>>>>>> Regarding making the WG a "rich function", I can see the
>> potential
>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how should
>>>>>> the
>>>>>>>>>>> watermark state be handled in the case of scale-in? It could be
>>>> made
>>>>>>> to
>>>>>>>>>>> work in the Kafka case by attaching the state to the partition
>>>> state
>>>>>>>>>>> that we keep, but then we have potential backwards compatibility
>>>>>>>>>>> problems also for the WM state. Does the WG usually need to keep
>>>> the
>>>>>>>>>>> state or might it be enough if the state is transient, i.e. if
>> you
>>>>>>> have
>>>>>>>>>>> a restart the WG would loose its histogram but it would rebuild
>> it
>>>>>>>>>>> quickly and you would get back to the same steady state as
>> before.
>>>>>>>>>>> Best,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it forward,
>>>>>>>>>>>> Aljoscha.
>>>>>>>>>>>>
>>>>>>>>>>>> I also like the idea of making the Watermark generator a rich
>>>>>>> function
>>>>>>>>>> --
>>>>>>>>>>>> this should make it more straightforward to implement smarter
>>>>>>>>>>>> watermark
>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics about the
>>>>>>>>>>>> actual
>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
>>>> variable
>>>>>>>>>>> delay.
>>>>>>>>>>>> David
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
>>>>>> kklou...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for opening the discussion!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have two comments on the FLIP:
>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
>> open()/
>>>>>>>>>>>>> close(), probably with a Context as argument: I have not fully
>>>>>>>>>>>>> thought
>>>>>>>>>>>>> this through but I think that this is more aligned with the
>> rest
>>>>>> of
>>>>>>>>>>>>> our rich functions. In addition, it will allow, for example, to
>>>>>>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to
>> do
>>>>>>>>>>>>> anything in the open()).
>>>>>>>>>>>>> 2) aligned with the above, and with the case where we want to
>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
>>>>>> could
>>>>>>>>>>>>> implement this in the future. In the FLIP, it is proposed to
>>>>>> expose
>>>>>>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
>>>>>> Given
>>>>>>>>>>>>> that there is the implicit contract that watermarks are
>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have
>> (I
>>>>>>>>>>>>> assume) a check that will compare the last emitted WM against
>> the
>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
>> risk
>>>>>>>>>>>>> having the user shooting himself on the foot if he/she
>>>>>> accidentally
>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
>>>>>>>>>>>>> caller do
>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the
>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
>>>>>> responsible
>>>>>>>>>>>>> for checkpointing the WM?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Given this, why not having the methods as:
>>>>>>>>>>>>>
>>>>>>>>>>>>> public interface WatermarkGenerator<T> {
>>>>>>>>>>>>>
>>>>>>>>>>>>>        Watermark onEvent(T event, long eventTimestamp,
>>>>>>> WatermarkOutput
>>>>>>>>>>>>> output);
>>>>>>>>>>>>>
>>>>>>>>>>>>>        Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> and the caller will be the one enforcing any invariants, such
>> as
>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
>> checkpoint
>>>>>>>>>>>>> anything that is needed as it will have complete knowledge as
>> to
>>>>>> if
>>>>>>>>>>>>> the WM was emitted or not.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
>> twal...@apache.org
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
>>>>>>> unification.
>>>>>>>>>> We
>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for
>> FLIP-95
>>>>>> [1]
>>>>>>>>>> and
>>>>>>>>>>>>>> look forward to update to the new unified watermark generators
>>>>>> once
>>>>>>>>>>>>>> FLIP-126 has been accepted.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>> Hi Everyone!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
>>>>>>>>>> separate)
>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in
>>>> an
>>>>>>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
>>>>>> for
>>>>>>>>>> the
>>>>>>>>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that want to
>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to support
>>>>>> two
>>>>>>>>>>>>>>> separate interfaces, we have two operator implementations for
>>>>>> the
>>>>>>>>>>>>>>> different flavours. Also, this makes features such as generic
>>>>>>>>>> support
>>>>>>>>>>>>>>> for idleness detection more complicated to implemented
>> because
>>>>>> we
>>>>>>>>>>> again
>>>>>>>>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In this FLIP we propose two things:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>>>>>>>>> The motivation for the first is to simplify future
>>>>>> implementations
>>>>>>>>>> and
>>>>>>>>>>>>>>> code duplication. The motivation for the second point is
>> again
>>>>>>> code
>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend from
>>>> some
>>>>>>>>>> base
>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
>> users
>>>>>>>>>>>>>>> have
>>>>>>>>>>> to
>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to
>>>> provide
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> timestamp extraction logic.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
>>>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
>>>>>>>>>> stream/partition
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that we want
>>>> to
>>>>>>>>>> unify
>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time split
>>>> the
>>>>>>>>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
>>>>>>>>>>>>>>> your feedback.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>>>>>>>>>>>> [2]
>> https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to