That's for sure. I am not claiming against it. What I am saying is that
we don't necessarily need a true "sorting" in this particular use case.
We only need to cluster records with the same keys together. We don't
need the keys to be logically sorted. What I am saying is that for
clustering the keys a binary order is enough. I agree this would not
work if we we were to implement an operation such as DataStream#sort.

Best,

Dawid

On 09/09/2020 08:22, Kurt Young wrote:
> I doubt that any sorting algorithm would work with only knowing the
> keys are different but without
> information of which is greater. 
>  
> Best,
> Kurt
>
>
> On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Ad. 1
>
>     Yes, you are right in principle.
>
>     Let me though clarify my proposal a bit. The proposed sort-style
>     execution aims at a generic KeyedProcessFunction were all the
>     "aggregations" are actually performed in the user code. It tries to
>     improve the performance by actually removing the need to use
>     RocksDB e.g.:
>
>         private static final class Summer<K>
>                 extends KeyedProcessFunction<K, Tuple2<K, Integer>,
>     Tuple2<K, Integer>> {
>
>             ....
>
>             @Override
>             public void processElement(
>                     Tuple2<K, Integer> value,
>                     Context ctx,
>                     Collector<Tuple2<K, Integer>> out) throws Exception {
>                 if (!Objects.equals(timerRegistered.value(),
>     Boolean.TRUE)) {
>                    
>     ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>                     timerRegistered.update(true);
>                 }
>                 Integer v = counter.value();
>                 Integer incomingValue = value.f1;
>                 if (v != null) {
>                     v += incomingValue;
>                 } else {
>                     v = incomingValue;
>                 }
>                 counter.update(v);
>             }
>
>             ....
>
>        }
>
>     Therefore I don't think the first part of your reply with
>     separating the
>     write and read workload applies here. We do not aim to create a
>     competing API with the Table API. We think operations such as joins or
>     analytical aggregations should be performed in Table API.
>
>     As for the second part I agree it would be nice to fall back to the
>     sorting approach only if a certain threshold of memory in a State
>     Backend is used. This has some problems though. We would need a way to
>     estimate the size of the occupied memory to tell when the threshold is
>     reached. That is not easily doable by default e.g. in a
>     MemoryStateBackend, as we do not serialize the values in the state
>     backend by default. We would have to add that, but this would add the
>     overhead of the serialization.
>
>     This proposal aims at the cases where we do have a large state
>     that will
>     not fit into the memory and without the change users are forced to use
>     RocksDB. If the state fits in memory I agree it will be better to do
>     hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
>     think it is important to give users the choice to use one or the other
>     approach. We might discuss which approach should be the default for
>     RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
>     user configured state backend or sorting-based with a single key at a
>     time backend. Moreover we could think if we should let users
>     choose the
>     sort vs hash "state backend" per operator. Would that suffice?
>
>     Ad. 2
>
>     I still think we can just use the first X bytes of the serialized form
>     as the normalized key and fallback to comparing full keys on
>     clashes. It
>     is because we are actually not interested in a logical order, but we
>     care only about the "grouping" aspect of the sorting. Therefore I
>     think
>     its enough to compare only parts of the full key as the normalized
>     key.
>
>     Thanks again for the really nice and thorough feedback!
>
>     Best,
>
>     Dawid
>
>     On 08/09/2020 14:47, Kurt Young wrote:
>     > Regarding #1, yes the state backend is definitely hash-based
>     execution.
>     > However there are some differences between
>     > batch hash-based execution. The key difference is *random access &
>     > read/write mixed workload". For example, by using
>     > state backend in streaming execution, one have to mix the read
>     and write
>     > operations and all of them are actually random
>     > access. But in a batch hash execution, we could divide the
>     phases into
>     > write and read. For example, we can build the
>     > hash table first, with only write operations. And once the build
>     is done,
>     > we can start to read and trigger the user codes.
>     > Take hash aggregation which blink planner implemented as an
>     example, during
>     > building phase, as long as the hash map
>     > could fit into memory, we will update the accumulators directly
>     in the hash
>     > map. And once we are running out of memory,
>     > we then fall back to sort based execution. It improves the
>     performance a
>     > lot if the incoming data can be processed in
>     > memory.
>     >
>     > Regarding #2, IIUC you are actually describing a binary format
>     of key, not
>     > normalized key which is used in DataSet. I will
>     > take String for example. If we have lots of keys with length all
>     greater
>     > than, let's say 20. In your proposal, you will encode
>     > the whole string in the prefix of your composed data ( <key> +
>     <timestamp>
>     > + <record> ). And when you compare
>     > records, you will actually compare the *whole* key of the
>     record. For
>     > normalized key, it's fixed-length in this case, IIRC it will
>     > take 8 bytes to represent the string. And the sorter will store the
>     > normalized key and offset in a dedicated array. When doing
>     > the sorting, it only sorts this *small* array. If the normalized
>     keys are
>     > different, you could immediately tell which is greater from
>     > normalized keys. You only have to compare the full keys if the
>     normalized
>     > keys are equal and you know in this case the normalized
>     > key couldn't represent the full key. The reason why Dataset is
>     doing this
>     > is it's super cache efficient by sorting the *small* array.
>     > The idea is borrowed from this paper [1]. Let me know if I missed or
>     > misunderstood anything.
>     >
>     > [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
>     > cache-sensitive parallel external sort)
>     >
>     > Best,
>     > Kurt
>     >
>     >
>     > On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz
>     <dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
>     > wrote:
>     >
>     >> Hey Kurt,
>     >>
>     >> Thank you for comments!
>     >>
>     >> Ad. 1 I might have missed something here, but as far as I see
>     it is that
>     >> using the current execution stack with regular state backends
>     (RocksDB
>     >> in particular if we want to have spilling capabilities) is
>     equivalent to
>     >> hash-based execution. I can see a different spilling state backend
>     >> implementation in the future, but I think it is not batch
>     specifc. Or am
>     >> I missing something?
>     >>
>     >> Ad. 2 Totally agree that normalized keys are important to the
>     >> performance. I think though TypeComparators are not a necessity
>     to have
>     >> that. Actually  this proposal is heading towards only ever
>     performing
>     >> "normalized keys" comparison. I have not included in the
>     proposal the
>     >> binary format which we will use for sorting (partially because
>     I forgot,
>     >> and partially because I thought it was too much of an
>     implementation
>     >> detail). Let me include it here though, as it might clear the
>     situation
>     >> a bit here.
>     >>
>     >> In DataSet, at times we have KeySelectors which extract keys
>     based on
>     >> field indices or names. This allows in certain situation to
>     extract the
>     >> key from serialized records. Compared to DataSet, in
>     DataStream, the key
>     >> is always described with a black-box KeySelector, or
>     differently with a
>     >> function which extracts a key from a deserialized record.  In
>     turn there
>     >> is no way to create a comparator that could compare records by
>     >> extracting the key from a serialized record (neither with, nor
>     without
>     >> key normalization). We suggest that the input for the sorter
>     will be
>     >>
>     >> <key> + <timestamp> + <record>
>     >>
>     >> Without having the key prepended we would have to deserialize
>     the record
>     >> for every key comparison.
>     >>
>     >> Therefore if we agree that we perform binary comparison for
>     keys (which
>     >> are always prepended), it is actually equivalent to a DataSet with
>     >> TypeComparators that support key normalization.
>     >>
>     >> Let me know if that is clear, or I have missed something here.
>     >>
>     >> Best,
>     >>
>     >> Dawid
>     >>
>     >> On 08/09/2020 03:39, Kurt Young wrote:
>     >>> Hi Dawid, thanks for bringing this up, it's really exciting to
>     see that
>     >>> batch execution is introduced in DataStream. From the flip, it
>     seems
>     >>> we are sticking with sort based execution mode (at least for
>     now), which
>     >>> will sort the whole input data before any *keyed* operation is
>     >>> executed. I have two comments here:
>     >>>
>     >>> 1. Do we want to introduce hash-based execution in the future?
>     Sort is a
>     >>> safe choice but not the best in lots of cases. IIUC we only need
>     >>> to make sure that before the framework finishes dealing with
>     one key, the
>     >>> operator doesn't see any data belonging to other keys, thus
>     >>> hash-based execution would also do the trick. Oon tricky thing the
>     >>> framework might need to deal with is memory constraint and
>     spilling
>     >>> in the hash map, but Flink also has some good knowledge about
>     these
>     >> stuff.
>     >>> 2. Going back to sort-based execution and how to sort keys.
>     From my
>     >>> experience, the performance of sorting would be one the most
>     important
>     >>> things if we want to achieve good performance of batch
>     execution. And
>     >>> normalized keys are actually the key of the performance of
>     sorting.
>     >>> If we want to get rid of TypeComparator, I think we still need
>     to find a
>     >>> way to introduce this back.
>     >>>
>     >>> Best,
>     >>> Kurt
>     >>>
>     >>>
>     >>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek
>     <aljos...@apache.org <mailto:aljos...@apache.org>>
>     >> wrote:
>     >>>> Yes, I think we can address the problem of indeterminacy in a
>     separate
>     >>>> FLIP because we're already in it.
>     >>>>
>     >>>> Aljoscha
>     >>>>
>     >>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>     >>>>> @Seth That's a very good point. I agree that RocksDB has the
>     same
>     >>>>> problem. I think we can use the same approach for the sorted
>     shuffles
>     >>>>> then. @Aljoscha I agree we should think about making it more
>     resilient,
>     >>>>> as I guess users might have problems already if they use
>     keys with
>     >>>>> non-deterministic binary representation. How do you feel about
>     >>>>> addressing that separately purely to limit the scope of this
>     FLIP?
>     >>>>>
>     >>>>> @Aljoscha I tend to agree with you that the best place to
>     actually
>     >> place
>     >>>>> the sorting would be in the InputProcessor(s). If there are
>     no more
>     >>>>> suggestions in respect to that issue. I'll put this proposal for
>     >> voting.
>     >>>>> @all Thank you for the feedback so far. I'd like to start a
>     voting
>     >>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
>     >> comment
>     >>>>> before that, if you still have some outstanding ideas.
>     >>>>>
>     >>>>> Best,
>     >>>>>
>     >>>>> Dawid
>     >>>>>
>     >>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>     >>>>>> Seth is right, I was just about to write that as well.
>     There is a
>     >>>>>> problem, though, because some of our TypeSerializers are not
>     >>>>>> deterministic even though we use them as if they were. Beam
>     excludes
>     >>>>>> the FloatCoder, for example, and the AvroCoder in certain
>     cases. I'm
>     >>>>>> pretty sure there is also weirdness going on in our
>     KryoSerializer.
>     >>>>>>
>     >>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>     >>>>>>> There is already an implicit assumption the TypeSerializer
>     for keys
>     >> is
>     >>>>>>> stable/deterministic, RocksDB compares keys using their
>     serialized
>     >> byte
>     >>>>>>> strings. I think this is a non-issue (or at least it's not
>     changing
>     >> the
>     >>>>>>> status quo).
>     >>>>>>>
>     >>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther
>     <twal...@apache.org <mailto:twal...@apache.org>>
>     >>>> wrote:
>     >>>>>>>> +1 for getting rid of the TypeComparator interface and
>     rely on the
>     >>>>>>>> serialized representation for grouping.
>     >>>>>>>>
>     >>>>>>>> Adding a new type to DataStream API is quite difficult at
>     the moment
>     >>>>>>>> due
>     >>>>>>>> to too many components that are required: TypeInformation
>     (tries to
>     >>>>>>>> deal
>     >>>>>>>> with logical fields for TypeComparators), TypeSerializer
>     (incl. it's
>     >>>>>>>> snapshot interfaces), and TypeComparator (with many
>     methods and
>     >>>>>>>> internals such normalized keys etc.).
>     >>>>>>>>
>     >>>>>>>> If necessary, we can add more simple comparison-related
>     methods to
>     >> the
>     >>>>>>>> TypeSerializer interface itself in the future (like
>     >>>>>>>> TypeSerializer.isDeterministic).
>     >>>>>>>>
>     >>>>>>>> Regards,
>     >>>>>>>> Timo
>     >>>>>>>>
>     >>>>>>>>
>     >>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>     >>>>>>>>> Thanks for publishing the FLIP!
>     >>>>>>>>>
>     >>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz
>     <dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
>     >>>>>>>>> wrote:
>     >>>>>>>>>>     1. How to sort/group keys? What representation of
>     the key
>     >>>>>>>>>> should we
>     >>>>>>>>>>        use? Should we sort on the binary form or should
>     we depend
>     >> on
>     >>>>>>>>>>        Comparators being available.
>     >>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>     >>>>>>>>> sorting/grouping
>     >>>>>>>> by using the binary representation. Then my opinion
>     switched and I
>     >>>>>>>> thought
>     >>>>>>>> we should use TypeComparator/Comparator because that's
>     what the
>     >>>>>>>> DataSet API
>     >>>>>>>> uses. After talking to Stephan, I'm again encouraged in
>     my opinion
>     >>>>>>>> to use
>     >>>>>>>> the binary representation because it means we can
>     eventually get rid
>     >>>>>>>> of the
>     >>>>>>>> TypeComparator interface, which is a bit complicated, and
>     because we
>     >>>>>>>> don't
>     >>>>>>>> need any good order in our sort, we only need the grouping.
>     >>>>>>>>> This comes with some problems, though: we need to ensure
>     that the
>     >>>>>>>> TypeSerializer of the type we're sorting is
>     stable/deterministic.
>     >>>>>>>> Beam has
>     >>>>>>>> infrastructure for this in the form of
>     Coder.verifyDeterministic()
>     >> [1]
>     >>>>>>>> which we don't have right now and should add if we go
>     down this
>     >> path.
>     >>>>>>>>>>     2. Where in the stack should we apply the sorting (this
>     >> rather a
>     >>>>>>>>>>        discussion about internals)
>     >>>>>>>>> Here, I'm gravitating towards the third option of
>     implementing it
>     >>>>>>>>> in the
>     >>>>>>>> layer of the StreamTask, which probably means
>     implementing a custom
>     >>>>>>>> InputProcessor. I think it's best to do it in this layer
>     because we
>     >>>>>>>> would
>     >>>>>>>> not mix concerns of different layers as we would if we
>     implemented
>     >>>>>>>> this as
>     >>>>>>>> a custom StreamOperator. I think this solution is also
>     best when it
>     >>>>>>>> comes
>     >>>>>>>> to multi-input operators.
>     >>>>>>>>>>     3. How should we deal with custom implementations of
>     >>>>>>>>>> StreamOperators
>     >>>>>>>>> I think the cleanest solution would be to go through the
>     complete
>     >>>>>>>> operator lifecycle for every key, because then the
>     watermark would
>     >> not
>     >>>>>>>> oscillate between -Inf and +Inf and we would not break the
>     >> semantical
>     >>>>>>>> guarantees that we gave to operators so far, in that the
>     watermark
>     >> is
>     >>>>>>>> strictly monotonically increasing. However, I don't think
>     this
>     >>>>>>>> solution is
>     >>>>>>>> feasible because it would come with too much overhead. We
>     should
>     >>>>>>>> solve this
>     >>>>>>>> problem via documentation and maybe educate people to not
>     query the
>     >>>>>>>> current
>     >>>>>>>> watermark or not rely on the watermark being monotonically
>     >>>>>>>> increasing in
>     >>>>>>>> operator implementations to allow the framework more
>     freedoms in how
>     >>>>>>>> user
>     >>>>>>>> programs are executed.
>     >>>>>>>>> Aljoscha
>     >>>>>>>>>
>     >>>>>>>>> [1]
>     >>
>     
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>     >>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to