There is already an implicit assumption the TypeSerializer for keys is
stable/deterministic, RocksDB compares keys using their serialized byte
strings. I think this is a non-issue (or at least it's not changing the
status quo).

On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twal...@apache.org> wrote:

> +1 for getting rid of the TypeComparator interface and rely on the
> serialized representation for grouping.
>
> Adding a new type to DataStream API is quite difficult at the moment due
> to too many components that are required: TypeInformation (tries to deal
> with logical fields for TypeComparators), TypeSerializer (incl. it's
> snapshot interfaces), and TypeComparator (with many methods and
> internals such normalized keys etc.).
>
> If necessary, we can add more simple comparison-related methods to the
> TypeSerializer interface itself in the future (like
> TypeSerializer.isDeterministic).
>
> Regards,
> Timo
>
>
> On 04.09.20 11:48, Aljoscha Krettek wrote:
> > Thanks for publishing the FLIP!
> >
> > On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakow...@apache.org> wrote:
> >>   1. How to sort/group keys? What representation of the key should we
> >>      use? Should we sort on the binary form or should we depend on
> >>      Comparators being available.
> >
> > Initially, I suggested to Dawid (in private) to do the sorting/grouping
> by using the binary representation. Then my opinion switched and I thought
> we should use TypeComparator/Comparator because that's what the DataSet API
> uses. After talking to Stephan, I'm again encouraged in my opinion to use
> the binary representation because it means we can eventually get rid of the
> TypeComparator interface, which is a bit complicated, and because we don't
> need any good order in our sort, we only need the grouping.
> >
> > This comes with some problems, though: we need to ensure that the
> TypeSerializer of the type we're sorting is stable/deterministic. Beam has
> infrastructure for this in the form of Coder.verifyDeterministic() [1]
> which we don't have right now and should add if we go down this path.
> >
> >>   2. Where in the stack should we apply the sorting (this rather a
> >>      discussion about internals)
> >
> > Here, I'm gravitating towards the third option of implementing it in the
> layer of the StreamTask, which probably means implementing a custom
> InputProcessor. I think it's best to do it in this layer because we would
> not mix concerns of different layers as we would if we implemented this as
> a custom StreamOperator. I think this solution is also best when it comes
> to multi-input operators.
> >
> >>   3. How should we deal with custom implementations of StreamOperators
> >
> > I think the cleanest solution would be to go through the complete
> operator lifecycle for every key, because then the watermark would not
> oscillate between -Inf and +Inf and we would not break the semantical
> guarantees that we gave to operators so far, in that the watermark is
> strictly monotonically increasing. However, I don't think this solution is
> feasible because it would come with too much overhead. We should solve this
> problem via documentation and maybe educate people to not query the current
> watermark or not rely on the watermark being monotonically increasing in
> operator implementations to allow the framework more freedoms in how user
> programs are executed.
> >
> > Aljoscha
> >
> > [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >
>
>

Reply via email to