That's for sure. I am not claiming against it. What I am saying is that we don't necessarily need a true "sorting" in this particular use case. We only need to cluster records with the same keys together. We don't need the keys to be logically sorted. What I am saying is that for clustering the keys a binary order is enough. I agree this would not work if we we were to implement an operation such as DataStream#sort.
Best, Dawid On 09/09/2020 08:22, Kurt Young wrote: > I doubt that any sorting algorithm would work with only knowing the > keys are different but without > information of which is greater. > > Best, > Kurt > > > On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote: > > Ad. 1 > > Yes, you are right in principle. > > Let me though clarify my proposal a bit. The proposed sort-style > execution aims at a generic KeyedProcessFunction were all the > "aggregations" are actually performed in the user code. It tries to > improve the performance by actually removing the need to use > RocksDB e.g.: > > private static final class Summer<K> > extends KeyedProcessFunction<K, Tuple2<K, Integer>, > Tuple2<K, Integer>> { > > .... > > @Override > public void processElement( > Tuple2<K, Integer> value, > Context ctx, > Collector<Tuple2<K, Integer>> out) throws Exception { > if (!Objects.equals(timerRegistered.value(), > Boolean.TRUE)) { > > ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); > timerRegistered.update(true); > } > Integer v = counter.value(); > Integer incomingValue = value.f1; > if (v != null) { > v += incomingValue; > } else { > v = incomingValue; > } > counter.update(v); > } > > .... > > } > > Therefore I don't think the first part of your reply with > separating the > write and read workload applies here. We do not aim to create a > competing API with the Table API. We think operations such as joins or > analytical aggregations should be performed in Table API. > > As for the second part I agree it would be nice to fall back to the > sorting approach only if a certain threshold of memory in a State > Backend is used. This has some problems though. We would need a way to > estimate the size of the occupied memory to tell when the threshold is > reached. That is not easily doable by default e.g. in a > MemoryStateBackend, as we do not serialize the values in the state > backend by default. We would have to add that, but this would add the > overhead of the serialization. > > This proposal aims at the cases where we do have a large state > that will > not fit into the memory and without the change users are forced to use > RocksDB. If the state fits in memory I agree it will be better to do > hash-based aggregations e.g. using the MemoryStateBackend. Therefore I > think it is important to give users the choice to use one or the other > approach. We might discuss which approach should be the default for > RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with > user configured state backend or sorting-based with a single key at a > time backend. Moreover we could think if we should let users > choose the > sort vs hash "state backend" per operator. Would that suffice? > > Ad. 2 > > I still think we can just use the first X bytes of the serialized form > as the normalized key and fallback to comparing full keys on > clashes. It > is because we are actually not interested in a logical order, but we > care only about the "grouping" aspect of the sorting. Therefore I > think > its enough to compare only parts of the full key as the normalized > key. > > Thanks again for the really nice and thorough feedback! > > Best, > > Dawid > > On 08/09/2020 14:47, Kurt Young wrote: > > Regarding #1, yes the state backend is definitely hash-based > execution. > > However there are some differences between > > batch hash-based execution. The key difference is *random access & > > read/write mixed workload". For example, by using > > state backend in streaming execution, one have to mix the read > and write > > operations and all of them are actually random > > access. But in a batch hash execution, we could divide the > phases into > > write and read. For example, we can build the > > hash table first, with only write operations. And once the build > is done, > > we can start to read and trigger the user codes. > > Take hash aggregation which blink planner implemented as an > example, during > > building phase, as long as the hash map > > could fit into memory, we will update the accumulators directly > in the hash > > map. And once we are running out of memory, > > we then fall back to sort based execution. It improves the > performance a > > lot if the incoming data can be processed in > > memory. > > > > Regarding #2, IIUC you are actually describing a binary format > of key, not > > normalized key which is used in DataSet. I will > > take String for example. If we have lots of keys with length all > greater > > than, let's say 20. In your proposal, you will encode > > the whole string in the prefix of your composed data ( <key> + > <timestamp> > > + <record> ). And when you compare > > records, you will actually compare the *whole* key of the > record. For > > normalized key, it's fixed-length in this case, IIRC it will > > take 8 bytes to represent the string. And the sorter will store the > > normalized key and offset in a dedicated array. When doing > > the sorting, it only sorts this *small* array. If the normalized > keys are > > different, you could immediately tell which is greater from > > normalized keys. You only have to compare the full keys if the > normalized > > keys are equal and you know in this case the normalized > > key couldn't represent the full key. The reason why Dataset is > doing this > > is it's super cache efficient by sorting the *small* array. > > The idea is borrowed from this paper [1]. Let me know if I missed or > > misunderstood anything. > > > > [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a > > cache-sensitive parallel external sort) > > > > Best, > > Kurt > > > > > > On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> > > wrote: > > > >> Hey Kurt, > >> > >> Thank you for comments! > >> > >> Ad. 1 I might have missed something here, but as far as I see > it is that > >> using the current execution stack with regular state backends > (RocksDB > >> in particular if we want to have spilling capabilities) is > equivalent to > >> hash-based execution. I can see a different spilling state backend > >> implementation in the future, but I think it is not batch > specifc. Or am > >> I missing something? > >> > >> Ad. 2 Totally agree that normalized keys are important to the > >> performance. I think though TypeComparators are not a necessity > to have > >> that. Actually this proposal is heading towards only ever > performing > >> "normalized keys" comparison. I have not included in the > proposal the > >> binary format which we will use for sorting (partially because > I forgot, > >> and partially because I thought it was too much of an > implementation > >> detail). Let me include it here though, as it might clear the > situation > >> a bit here. > >> > >> In DataSet, at times we have KeySelectors which extract keys > based on > >> field indices or names. This allows in certain situation to > extract the > >> key from serialized records. Compared to DataSet, in > DataStream, the key > >> is always described with a black-box KeySelector, or > differently with a > >> function which extracts a key from a deserialized record. In > turn there > >> is no way to create a comparator that could compare records by > >> extracting the key from a serialized record (neither with, nor > without > >> key normalization). We suggest that the input for the sorter > will be > >> > >> <key> + <timestamp> + <record> > >> > >> Without having the key prepended we would have to deserialize > the record > >> for every key comparison. > >> > >> Therefore if we agree that we perform binary comparison for > keys (which > >> are always prepended), it is actually equivalent to a DataSet with > >> TypeComparators that support key normalization. > >> > >> Let me know if that is clear, or I have missed something here. > >> > >> Best, > >> > >> Dawid > >> > >> On 08/09/2020 03:39, Kurt Young wrote: > >>> Hi Dawid, thanks for bringing this up, it's really exciting to > see that > >>> batch execution is introduced in DataStream. From the flip, it > seems > >>> we are sticking with sort based execution mode (at least for > now), which > >>> will sort the whole input data before any *keyed* operation is > >>> executed. I have two comments here: > >>> > >>> 1. Do we want to introduce hash-based execution in the future? > Sort is a > >>> safe choice but not the best in lots of cases. IIUC we only need > >>> to make sure that before the framework finishes dealing with > one key, the > >>> operator doesn't see any data belonging to other keys, thus > >>> hash-based execution would also do the trick. Oon tricky thing the > >>> framework might need to deal with is memory constraint and > spilling > >>> in the hash map, but Flink also has some good knowledge about > these > >> stuff. > >>> 2. Going back to sort-based execution and how to sort keys. > From my > >>> experience, the performance of sorting would be one the most > important > >>> things if we want to achieve good performance of batch > execution. And > >>> normalized keys are actually the key of the performance of > sorting. > >>> If we want to get rid of TypeComparator, I think we still need > to find a > >>> way to introduce this back. > >>> > >>> Best, > >>> Kurt > >>> > >>> > >>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek > <aljos...@apache.org <mailto:aljos...@apache.org>> > >> wrote: > >>>> Yes, I think we can address the problem of indeterminacy in a > separate > >>>> FLIP because we're already in it. > >>>> > >>>> Aljoscha > >>>> > >>>> On 07.09.20 17:00, Dawid Wysakowicz wrote: > >>>>> @Seth That's a very good point. I agree that RocksDB has the > same > >>>>> problem. I think we can use the same approach for the sorted > shuffles > >>>>> then. @Aljoscha I agree we should think about making it more > resilient, > >>>>> as I guess users might have problems already if they use > keys with > >>>>> non-deterministic binary representation. How do you feel about > >>>>> addressing that separately purely to limit the scope of this > FLIP? > >>>>> > >>>>> @Aljoscha I tend to agree with you that the best place to > actually > >> place > >>>>> the sorting would be in the InputProcessor(s). If there are > no more > >>>>> suggestions in respect to that issue. I'll put this proposal for > >> voting. > >>>>> @all Thank you for the feedback so far. I'd like to start a > voting > >>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you > >> comment > >>>>> before that, if you still have some outstanding ideas. > >>>>> > >>>>> Best, > >>>>> > >>>>> Dawid > >>>>> > >>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote: > >>>>>> Seth is right, I was just about to write that as well. > There is a > >>>>>> problem, though, because some of our TypeSerializers are not > >>>>>> deterministic even though we use them as if they were. Beam > excludes > >>>>>> the FloatCoder, for example, and the AvroCoder in certain > cases. I'm > >>>>>> pretty sure there is also weirdness going on in our > KryoSerializer. > >>>>>> > >>>>>> On 04.09.20 14:59, Seth Wiesman wrote: > >>>>>>> There is already an implicit assumption the TypeSerializer > for keys > >> is > >>>>>>> stable/deterministic, RocksDB compares keys using their > serialized > >> byte > >>>>>>> strings. I think this is a non-issue (or at least it's not > changing > >> the > >>>>>>> status quo). > >>>>>>> > >>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther > <twal...@apache.org <mailto:twal...@apache.org>> > >>>> wrote: > >>>>>>>> +1 for getting rid of the TypeComparator interface and > rely on the > >>>>>>>> serialized representation for grouping. > >>>>>>>> > >>>>>>>> Adding a new type to DataStream API is quite difficult at > the moment > >>>>>>>> due > >>>>>>>> to too many components that are required: TypeInformation > (tries to > >>>>>>>> deal > >>>>>>>> with logical fields for TypeComparators), TypeSerializer > (incl. it's > >>>>>>>> snapshot interfaces), and TypeComparator (with many > methods and > >>>>>>>> internals such normalized keys etc.). > >>>>>>>> > >>>>>>>> If necessary, we can add more simple comparison-related > methods to > >> the > >>>>>>>> TypeSerializer interface itself in the future (like > >>>>>>>> TypeSerializer.isDeterministic). > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote: > >>>>>>>>> Thanks for publishing the FLIP! > >>>>>>>>> > >>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> > >>>>>>>>> wrote: > >>>>>>>>>> 1. How to sort/group keys? What representation of > the key > >>>>>>>>>> should we > >>>>>>>>>> use? Should we sort on the binary form or should > we depend > >> on > >>>>>>>>>> Comparators being available. > >>>>>>>>> Initially, I suggested to Dawid (in private) to do the > >>>>>>>>> sorting/grouping > >>>>>>>> by using the binary representation. Then my opinion > switched and I > >>>>>>>> thought > >>>>>>>> we should use TypeComparator/Comparator because that's > what the > >>>>>>>> DataSet API > >>>>>>>> uses. After talking to Stephan, I'm again encouraged in > my opinion > >>>>>>>> to use > >>>>>>>> the binary representation because it means we can > eventually get rid > >>>>>>>> of the > >>>>>>>> TypeComparator interface, which is a bit complicated, and > because we > >>>>>>>> don't > >>>>>>>> need any good order in our sort, we only need the grouping. > >>>>>>>>> This comes with some problems, though: we need to ensure > that the > >>>>>>>> TypeSerializer of the type we're sorting is > stable/deterministic. > >>>>>>>> Beam has > >>>>>>>> infrastructure for this in the form of > Coder.verifyDeterministic() > >> [1] > >>>>>>>> which we don't have right now and should add if we go > down this > >> path. > >>>>>>>>>> 2. Where in the stack should we apply the sorting (this > >> rather a > >>>>>>>>>> discussion about internals) > >>>>>>>>> Here, I'm gravitating towards the third option of > implementing it > >>>>>>>>> in the > >>>>>>>> layer of the StreamTask, which probably means > implementing a custom > >>>>>>>> InputProcessor. I think it's best to do it in this layer > because we > >>>>>>>> would > >>>>>>>> not mix concerns of different layers as we would if we > implemented > >>>>>>>> this as > >>>>>>>> a custom StreamOperator. I think this solution is also > best when it > >>>>>>>> comes > >>>>>>>> to multi-input operators. > >>>>>>>>>> 3. How should we deal with custom implementations of > >>>>>>>>>> StreamOperators > >>>>>>>>> I think the cleanest solution would be to go through the > complete > >>>>>>>> operator lifecycle for every key, because then the > watermark would > >> not > >>>>>>>> oscillate between -Inf and +Inf and we would not break the > >> semantical > >>>>>>>> guarantees that we gave to operators so far, in that the > watermark > >> is > >>>>>>>> strictly monotonically increasing. However, I don't think > this > >>>>>>>> solution is > >>>>>>>> feasible because it would come with too much overhead. We > should > >>>>>>>> solve this > >>>>>>>> problem via documentation and maybe educate people to not > query the > >>>>>>>> current > >>>>>>>> watermark or not rely on the watermark being monotonically > >>>>>>>> increasing in > >>>>>>>> operator implementations to allow the framework more > freedoms in how > >>>>>>>> user > >>>>>>>> programs are executed. > >>>>>>>>> Aljoscha > >>>>>>>>> > >>>>>>>>> [1] > >> > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184 > >> >
signature.asc
Description: OpenPGP digital signature