> Ok, got your point now. I agree that it makes more sense to > make StateBackend return a contract instead of a particular > implementation. How about we name the new interface as > `CheckpointableKeyedStateBackend`? We could make > `BoundedStreamStateBackend` implement > `CheckpointableKeyedStateBackend` but without checkpoint related > operations yet, whereas reserving the possibility that the bounded > stream also supports checkpoint in future. What do you think? Sounds good to me. Will update the FLIP with the new name.
On 18/09/2020 15:31, Yu Li wrote: > /bq. The problem is that I could not use this "state backend" in a > StreamOperator./ > Ok, got your point now. I agree that it makes more sense to > make StateBackend return a contract instead of a particular > implementation. How about we name the new interface as > `CheckpointableKeyedStateBackend`? We could make > `BoundedStreamStateBackend` implement > `CheckpointableKeyedStateBackend` but without checkpoint related > operations yet, whereas reserving the possibility that the bounded > stream also supports checkpoint in future. What do you think? > > /bq. Correct, the goal though is not to outperform the > HeapStateBackend. The single key state backend requires sorted inputs > which come with a price. The main goal is to outperform > RocksDBStateBackend, which is necessary for large states./ > Personally I think the main benefit of introducing a bounded stream > specific state backend is that we could remove the data after > processing a key, thus reducing the cost of state storage a lot, > rather than the routine performance of state processing (smile). > > Thanks. > > Best Regards, > Yu > > > On Fri, 18 Sep 2020 at 20:48, Dawid Wysakowicz <dwysakow...@apache.org > <mailto:dwysakow...@apache.org>> wrote: > >> =============================================== >> /class BoundedStreamInternalStateBackend<K> implements >> KeyedStateBackend<K>, >> SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, >> Closeable, >> CheckpointListener {/ >> ===============================================/ >> / > The problem is that I could not use this "state backend" in a > StreamOperator. The goal of this effort is that it is mostly > transparent to all the implementations of StreamOperator(s). Right > now StreamOperator retrieves AbstractKeyedStateBackend through > StreamOperatorContext which instantiates it in > StreamTaskInitializer etc. The problem is that a big chunk of the > current code base uses the AbstractKeyedStateBackend, whereas it > really just needs an interface not that particular implementation. > The change is really only about separating the contract > (InternalKeyedStateBackend) from the implementation > (AbstractKeyedStateBackend). My thinking is that it is only an > approach to fix a mistake of the past that StateBackend returns a > particular implementation rather than a contract. > > I do agree I don't need the `SnapshotStrategy` and > `CheckpointListener` interfaces. The thing though is that the > runtime expects those contracts from an AbstractKeyedStateBackend. > > BTW, If you'd like to see how does this change really looks like > you can check the PR I already opened for it: > https://github.com/apache/flink/pull/13405/files > >> Checking the FLIP more closely I found below description: "With a >> high number of keys it (HeapStateBackend) suffers a significant >> penalty and becomes even less performant for that particular case >> than the sorting approach", does it mean "HeapStateBackend" >> outperformed "SingleKeyStateBackend" when the number of keys is >> relatively small > Correct, the goal though is not to outperform the > HeapStateBackend. The single key state backend requires sorted > inputs which come with a price. The main goal is to outperform > RocksDBStateBackend, which is necessary for large states. > >> Thanks for the summary. I think it's more specific and could help >> readers to better understand why we cannot use >> HeapKeyedStateBackend directly, than the single line description >> "when the StateBackend observes a new incoming key it will reset >> all acquired state objects so far". What do you think? > Sure, I can add it to the document. > > Best, > > Dawid > > On 18/09/2020 14:29, Yu Li wrote: >> Thanks for the clarification Dawid. Some of my thoughts: >> >> /bq. The results are times for end-to-end execution of a job. >> Therefore the sorting part is included. The actual target of the >> replacement is RocksDB, which does the serialization and key >> bytes comparison as well./ >> I see. Checking the FLIP more closely I found below description: >> "With a high number of keys it (HeapStateBackend) suffers a >> significant penalty and becomes even less performant for that >> particular case than the sorting approach", does it mean >> "HeapStateBackend" outperformed "SingleKeyStateBackend" when the >> number of keys is relatively small? The micro-benchmark of >> ValueState removes the key shuffling phase, so its result could >> be self-explained. >> >> About `InternalKeyedStateBackend`, let me rephrase my question: >> why don't we add the new state backend like below instead of >> adding a new interface (and IMHO there's no need to implement the >> `SnapshotStrategy` and `CheckpointListener` interfaces since it >> doesn't support checkpoint)? Reserved for adding more internal >> state backends in future? >> =============================================== >> /class BoundedStreamInternalStateBackend<K> implements >> KeyedStateBackend<K>, >> SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, >> Closeable, >> CheckpointListener {/ >> ===============================================/ >> / >> >> /bq. Let me though quickly summarize and if you find it useful I >> can add it to the FLIP itself./ >> Thanks for the summary. I think it's more specific and could help >> readers to better understand why we cannot use >> HeapKeyedStateBackend directly, than the single line description >> "when the StateBackend observes a new incoming key it will reset >> all acquired state objects so far". What do you think? >> >> Thanks. >> >> Best Regards, >> Yu >> >> >> On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz >> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote: >> >> Thanks for the comments Yu. >> >> > First of all, for the performance testing result, I'm >> wondering whether the >> > sorting cost is counted in the result for both DataSet and >> refined >> > DataStream implementations. I could think of the saving of >> hash computation >> > and final iteration to emit the word-count result >> (processing a key at a >> > time could save such iteration), but not sure whether these >> cost savings >> > are at the same grade of comparing the key bytes. >> The results are times for end-to-end execution of a job. >> Therefore the >> sorting part is included. The actual target of the replacement is >> RocksDB, which does the serialization and key bytes >> comparison as well. >> On top of that it adds all the RocksDB bookkeeping. >> >> > However, I'm not fully convinced to introduce a new >> > `InternalKeyedStateBackend` interface. I agree that we >> don't need to take >> > the overhead of `AbstractKeyedStateBackend` since we don't >> plan to support >> > checkpoint for now, but why don't we directly write a state >> backend >> > implementation for bounded stream? Or are we planning to >> introduce more >> > internal state backends in future? What's more, the current >> design of >> > `InternalKeyedStateBackend` in the FLIP document seems to >> be extending as >> > many interfaces as `AbstractedKeyedStateBackend` >> implements, which I guess >> > is a typo. >> Maybe I was not clear enough about the change. This change >> does not >> "strip" the AbstractKeyedStateBackend of any functionalities. >> My intent >> is not to remove any methods of the >> AbstractKeyedStateBackend. The >> problem here is that the AbstractKeyedStateBackend is an >> abstract class >> (duh ;)), which does have some predefined implementation. >> Moreover it >> requires objects such as InternalKeyContex, CloseableRegistry >> etc. to be >> constructed, which we don't need/want e.g. in the single key >> state >> backend. My intention here is to make the StateBackend return >> only pure >> interfaces. (AbstractKeyedStateBackend is the only >> non-interface that >> StateBackend returns). In other words I just want to make >> AbstractKeyedStateBackend a proper interface. It is not a >> typo that >> InternalKeyedStateBackend extends the same interfaces as >> AbstractKeyedStateBackend does. >> >> > Thirdly, I suggest we name the special state backend as >> > `BoundedStreamInternalStateBackend`. And from our existing >> javadoc of >> > `StateBackend` it actually cannot be called a complete >> state backend...: "A >> > State Backend defines how the state of a streaming >> application is stored >> > and checkpointed". >> Thanks for the suggestion. Sure I can use that name. Yes I do >> agree it >> is not a full fledged StateBackend. I do want it to be an >> internal >> class, that is never used explicitly by users. >> >> > Lastly, I didn't find a detailed design of the >> "SingleKeyStateBackend" in >> > the FLIP, >> I did not put it into the design, because 1) I found it >> internal. It >> does not touch any public facing interfaces. 2) It is rather >> straightforward. Let me though quickly summarize and if you >> find it >> useful I can add it to the FLIP itself. >> >> > as how to detect the key switching >> That is rather straightforwad. The state backend works only >> with the >> assumption that the keys are sorted/grouped together. We keep the >> current key and in the setCurrentKey we check if the new key is >> different then the current one. Side note: yes, custom user >> operators >> which call setCurrentKey explicitly might not work in this setup. >> >> > remove the data (especially in the non-windowing >> > case), etc. >> We only ever keep a single value for a state object. Therefore >> ValueState is a very thin wrapper for a value, MapState for a >> HashMap, >> ListState for a List etc. When the key changes we simply set >> the wrapped >> value/map/state to null. >> >> I hope this clarifies a few things. Let me know if you have >> any questions. >> >> Best, >> >> Dawid >> >> On 17/09/2020 15:28, Yu Li wrote: >> > Hi all, >> > >> > Sorry for being late to the discussion, but I just noticed >> there are some >> > state backend related changes proposed in this FLIP, so >> would like to share >> > my two cents. >> > >> > First of all, for the performance testing result, I'm >> wondering whether the >> > sorting cost is counted in the result for both DataSet and >> refined >> > DataStream implementations. I could think of the saving of >> hash computation >> > and final iteration to emit the word-count result >> (processing a key at a >> > time could save such iteration), but not sure whether these >> cost savings >> > are at the same grade of comparing the key bytes. >> > >> > Regardless of the performance result, I agree that the >> capability of >> > removing the data after processing a key could prominently >> reduce the space >> > required by state, so introducing a new state backend for >> bounded stream >> > makes sense. >> > >> > However, I'm not fully convinced to introduce a new >> > `InternalKeyedStateBackend` interface. I agree that we >> don't need to take >> > the overhead of `AbstractKeyedStateBackend` since we don't >> plan to support >> > checkpoint for now, but why don't we directly write a state >> backend >> > implementation for bounded stream? Or are we planning to >> introduce more >> > internal state backends in future? What's more, the current >> design of >> > `InternalKeyedStateBackend` in the FLIP document seems to >> be extending as >> > many interfaces as `AbstractedKeyedStateBackend` >> implements, which I guess >> > is a typo. >> > >> > Thirdly, I suggest we name the special state backend as >> > `BoundedStreamInternalStateBackend`. And from our existing >> javadoc of >> > `StateBackend` it actually cannot be called a complete >> state backend...: "A >> > State Backend defines how the state of a streaming >> application is stored >> > and checkpointed". >> > >> > Lastly, I didn't find a detailed design of the >> "SingleKeyStateBackend" in >> > the FLIP, and suggest we write the key design down, such as >> how to detect >> > the key switching and remove the data (especially in the >> non-windowing >> > case), etc. >> > >> > Thanks. >> > >> > Best Regards, >> > Yu >> > >> > >> > On Wed, 9 Sep 2020 at 17:18, Kurt Young <ykt...@gmail.com >> <mailto:ykt...@gmail.com>> wrote: >> > >> >> Yes, I didn't intend to block this FLIP, and some of the >> comments are >> >> actually implementation details. >> >> And all of them are handled internally, not visible to >> users, thus we can >> >> also change or improve them >> >> in the future. >> >> >> >> Best, >> >> Kurt >> >> >> >> >> >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek >> <aljos...@apache.org <mailto:aljos...@apache.org>> >> >> wrote: >> >> >> >>> I think Kurts concerns/comments are very valid and we >> need to implement >> >>> such things in the future. However, I also think that we >> need to get >> >>> started somewhere and I think what's proposed in this >> FLIP is a good >> >>> starting point that we can build on. So we should not get >> paralyzed by >> >>> thinking too far ahead into the future. Does that make sense? >> >>> >> >>> Best, >> >>> Aljoscha >> >>> >> >>> On 08.09.20 16:59, Dawid Wysakowicz wrote: >> >>>> Ad. 1 >> >>>> >> >>>> Yes, you are right in principle. >> >>>> >> >>>> Let me though clarify my proposal a bit. The proposed >> sort-style >> >>>> execution aims at a generic KeyedProcessFunction were >> all the >> >>>> "aggregations" are actually performed in the user code. >> It tries to >> >>>> improve the performance by actually removing the need to >> use RocksDB >> >>> e.g.: >> >>>> private static final class Summer<K> >> >>>> extends KeyedProcessFunction<K, Tuple2<K, >> Integer>, >> >>>> Tuple2<K, Integer>> { >> >>>> >> >>>> .... >> >>>> >> >>>> @Override >> >>>> public void processElement( >> >>>> Tuple2<K, Integer> value, >> >>>> Context ctx, >> >>>> Collector<Tuple2<K, Integer>> out) >> throws Exception { >> >>>> if (!Objects.equals(timerRegistered.value(), >> >> Boolean.TRUE)) >> >>> { >> >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); >> >>>> timerRegistered.update(true); >> >>>> } >> >>>> Integer v = counter.value(); >> >>>> Integer incomingValue = value.f1; >> >>>> if (v != null) { >> >>>> v += incomingValue; >> >>>> } else { >> >>>> v = incomingValue; >> >>>> } >> >>>> counter.update(v); >> >>>> } >> >>>> >> >>>> .... >> >>>> >> >>>> } >> >>>> >> >>>> Therefore I don't think the first part of your reply >> with separating >> >> the >> >>>> write and read workload applies here. We do not aim to >> create a >> >>>> competing API with the Table API. We think operations >> such as joins or >> >>>> analytical aggregations should be performed in Table API. >> >>>> >> >>>> As for the second part I agree it would be nice to fall >> back to the >> >>>> sorting approach only if a certain threshold of memory >> in a State >> >>>> Backend is used. This has some problems though. We would >> need a way to >> >>>> estimate the size of the occupied memory to tell when >> the threshold is >> >>>> reached. That is not easily doable by default e.g. in a >> >>>> MemoryStateBackend, as we do not serialize the values in >> the state >> >>>> backend by default. We would have to add that, but this >> would add the >> >>>> overhead of the serialization. >> >>>> >> >>>> This proposal aims at the cases where we do have a large >> state that >> >> will >> >>>> not fit into the memory and without the change users are >> forced to use >> >>>> RocksDB. If the state fits in memory I agree it will be >> better to do >> >>>> hash-based aggregations e.g. using the >> MemoryStateBackend. Therefore I >> >>>> think it is important to give users the choice to use >> one or the other >> >>>> approach. We might discuss which approach should be the >> default for >> >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be >> hash-based with >> >>>> user configured state backend or sorting-based with a >> single key at a >> >>>> time backend. Moreover we could think if we should let >> users choose the >> >>>> sort vs hash "state backend" per operator. Would that >> suffice? >> >>>> >> >>>> Ad. 2 >> >>>> >> >>>> I still think we can just use the first X bytes of the >> serialized form >> >>>> as the normalized key and fallback to comparing full >> keys on clashes. >> >> It >> >>>> is because we are actually not interested in a logical >> order, but we >> >>>> care only about the "grouping" aspect of the sorting. >> Therefore I think >> >>>> its enough to compare only parts of the full key as the >> normalized key. >> >>>> >> >>>> Thanks again for the really nice and thorough feedback! >> >>>> >> >>>> Best, >> >>>> >> >>>> Dawid >> >>>> >> >>>> On 08/09/2020 14:47, Kurt Young wrote: >> >>>>> Regarding #1, yes the state backend is definitely >> hash-based >> >> execution. >> >>>>> However there are some differences between >> >>>>> batch hash-based execution. The key difference is >> *random access & >> >>>>> read/write mixed workload". For example, by using >> >>>>> state backend in streaming execution, one have to mix >> the read and >> >> write >> >>>>> operations and all of them are actually random >> >>>>> access. But in a batch hash execution, we could divide >> the phases into >> >>>>> write and read. For example, we can build the >> >>>>> hash table first, with only write operations. And once >> the build is >> >>> done, >> >>>>> we can start to read and trigger the user codes. >> >>>>> Take hash aggregation which blink planner implemented >> as an example, >> >>> during >> >>>>> building phase, as long as the hash map >> >>>>> could fit into memory, we will update the accumulators >> directly in the >> >>> hash >> >>>>> map. And once we are running out of memory, >> >>>>> we then fall back to sort based execution. It improves the >> >> performance a >> >>>>> lot if the incoming data can be processed in >> >>>>> memory. >> >>>>> >> >>>>> Regarding #2, IIUC you are actually describing a binary >> format of key, >> >>> not >> >>>>> normalized key which is used in DataSet. I will >> >>>>> take String for example. If we have lots of keys with >> length all >> >> greater >> >>>>> than, let's say 20. In your proposal, you will encode >> >>>>> the whole string in the prefix of your composed data ( >> <key> + >> >>> <timestamp> >> >>>>> + <record> ). And when you compare >> >>>>> records, you will actually compare the *whole* key of >> the record. For >> >>>>> normalized key, it's fixed-length in this case, IIRC it >> will >> >>>>> take 8 bytes to represent the string. And the sorter >> will store the >> >>>>> normalized key and offset in a dedicated array. When doing >> >>>>> the sorting, it only sorts this *small* array. If the >> normalized keys >> >>> are >> >>>>> different, you could immediately tell which is greater from >> >>>>> normalized keys. You only have to compare the full keys >> if the >> >>> normalized >> >>>>> keys are equal and you know in this case the normalized >> >>>>> key couldn't represent the full key. The reason why >> Dataset is doing >> >>> this >> >>>>> is it's super cache efficient by sorting the *small* array. >> >>>>> The idea is borrowed from this paper [1]. Let me know >> if I missed or >> >>>>> misunderstood anything. >> >>>>> >> >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237 >> (AlphaSort: a >> >>>>> cache-sensitive parallel external sort) >> >>>>> >> >>>>> Best, >> >>>>> Kurt >> >>>>> >> >>>>> >> >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz < >> >> dwysakow...@apache.org <mailto:dwysakow...@apache.org> >> >>>>> wrote: >> >>>>> >> >>>>>> Hey Kurt, >> >>>>>> >> >>>>>> Thank you for comments! >> >>>>>> >> >>>>>> Ad. 1 I might have missed something here, but as far >> as I see it is >> >>> that >> >>>>>> using the current execution stack with regular state >> backends >> >> (RocksDB >> >>>>>> in particular if we want to have spilling >> capabilities) is equivalent >> >>> to >> >>>>>> hash-based execution. I can see a different spilling >> state backend >> >>>>>> implementation in the future, but I think it is not >> batch specifc. Or >> >>> am >> >>>>>> I missing something? >> >>>>>> >> >>>>>> Ad. 2 Totally agree that normalized keys are important >> to the >> >>>>>> performance. I think though TypeComparators are not a >> necessity to >> >> have >> >>>>>> that. Actually this proposal is heading towards only >> ever performing >> >>>>>> "normalized keys" comparison. I have not included in >> the proposal the >> >>>>>> binary format which we will use for sorting (partially >> because I >> >>> forgot, >> >>>>>> and partially because I thought it was too much of an >> implementation >> >>>>>> detail). Let me include it here though, as it might >> clear the >> >> situation >> >>>>>> a bit here. >> >>>>>> >> >>>>>> In DataSet, at times we have KeySelectors which >> extract keys based on >> >>>>>> field indices or names. This allows in certain >> situation to extract >> >> the >> >>>>>> key from serialized records. Compared to DataSet, in >> DataStream, the >> >>> key >> >>>>>> is always described with a black-box KeySelector, or >> differently >> >> with a >> >>>>>> function which extracts a key from a deserialized >> record. In turn >> >>> there >> >>>>>> is no way to create a comparator that could compare >> records by >> >>>>>> extracting the key from a serialized record (neither >> with, nor >> >> without >> >>>>>> key normalization). We suggest that the input for the >> sorter will be >> >>>>>> >> >>>>>> <key> + <timestamp> + <record> >> >>>>>> >> >>>>>> Without having the key prepended we would have to >> deserialize the >> >>> record >> >>>>>> for every key comparison. >> >>>>>> >> >>>>>> Therefore if we agree that we perform binary >> comparison for keys >> >> (which >> >>>>>> are always prepended), it is actually equivalent to a >> DataSet with >> >>>>>> TypeComparators that support key normalization. >> >>>>>> >> >>>>>> Let me know if that is clear, or I have missed >> something here. >> >>>>>> >> >>>>>> Best, >> >>>>>> >> >>>>>> Dawid >> >>>>>> >> >>>>>> On 08/09/2020 03:39, Kurt Young wrote: >> >>>>>>> Hi Dawid, thanks for bringing this up, it's really >> exciting to see >> >>> that >> >>>>>>> batch execution is introduced in DataStream. From the >> flip, it seems >> >>>>>>> we are sticking with sort based execution mode (at >> least for now), >> >>> which >> >>>>>>> will sort the whole input data before any *keyed* >> operation is >> >>>>>>> executed. I have two comments here: >> >>>>>>> >> >>>>>>> 1. Do we want to introduce hash-based execution in >> the future? Sort >> >>> is a >> >>>>>>> safe choice but not the best in lots of cases. IIUC >> we only need >> >>>>>>> to make sure that before the framework finishes >> dealing with one >> >> key, >> >>> the >> >>>>>>> operator doesn't see any data belonging to other >> keys, thus >> >>>>>>> hash-based execution would also do the trick. Oon >> tricky thing the >> >>>>>>> framework might need to deal with is memory >> constraint and spilling >> >>>>>>> in the hash map, but Flink also has some good >> knowledge about these >> >>>>>> stuff. >> >>>>>>> 2. Going back to sort-based execution and how to sort >> keys. From my >> >>>>>>> experience, the performance of sorting would be one >> the most >> >> important >> >>>>>>> things if we want to achieve good performance of >> batch execution. >> >> And >> >>>>>>> normalized keys are actually the key of the >> performance of sorting. >> >>>>>>> If we want to get rid of TypeComparator, I think we >> still need to >> >>> find a >> >>>>>>> way to introduce this back. >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Kurt >> >>>>>>> >> >>>>>>> >> >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek < >> >> aljos...@apache.org <mailto:aljos...@apache.org>> >> >>>>>> wrote: >> >>>>>>>> Yes, I think we can address the problem of >> indeterminacy in a >> >>> separate >> >>>>>>>> FLIP because we're already in it. >> >>>>>>>> >> >>>>>>>> Aljoscha >> >>>>>>>> >> >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote: >> >>>>>>>>> @Seth That's a very good point. I agree that >> RocksDB has the same >> >>>>>>>>> problem. I think we can use the same approach for >> the sorted >> >>> shuffles >> >>>>>>>>> then. @Aljoscha I agree we should think about >> making it more >> >>> resilient, >> >>>>>>>>> as I guess users might have problems already if >> they use keys with >> >>>>>>>>> non-deterministic binary representation. How do you >> feel about >> >>>>>>>>> addressing that separately purely to limit the >> scope of this FLIP? >> >>>>>>>>> >> >>>>>>>>> @Aljoscha I tend to agree with you that the best >> place to actually >> >>>>>> place >> >>>>>>>>> the sorting would be in the InputProcessor(s). If >> there are no >> >> more >> >>>>>>>>> suggestions in respect to that issue. I'll put this >> proposal for >> >>>>>> voting. >> >>>>>>>>> @all Thank you for the feedback so far. I'd like to >> start a voting >> >>>>>>>>> thread on the proposal tomorrow. Therefore I'd >> appreciate if you >> >>>>>> comment >> >>>>>>>>> before that, if you still have some outstanding ideas. >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> >> >>>>>>>>> Dawid >> >>>>>>>>> >> >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote: >> >>>>>>>>>> Seth is right, I was just about to write that as >> well. There is a >> >>>>>>>>>> problem, though, because some of our >> TypeSerializers are not >> >>>>>>>>>> deterministic even though we use them as if they >> were. Beam >> >>> excludes >> >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in >> certain cases. >> >>> I'm >> >>>>>>>>>> pretty sure there is also weirdness going on in our >> >> KryoSerializer. >> >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote: >> >>>>>>>>>>> There is already an implicit assumption the >> TypeSerializer for >> >>> keys >> >>>>>> is >> >>>>>>>>>>> stable/deterministic, RocksDB compares keys using >> their >> >> serialized >> >>>>>> byte >> >>>>>>>>>>> strings. I think this is a non-issue (or at least >> it's not >> >>> changing >> >>>>>> the >> >>>>>>>>>>> status quo). >> >>>>>>>>>>> >> >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther >> <twal...@apache.org <mailto:twal...@apache.org> >> >>>>>>>> wrote: >> >>>>>>>>>>>> +1 for getting rid of the TypeComparator >> interface and rely on >> >>> the >> >>>>>>>>>>>> serialized representation for grouping. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Adding a new type to DataStream API is quite >> difficult at the >> >>> moment >> >>>>>>>>>>>> due >> >>>>>>>>>>>> to too many components that are required: >> TypeInformation >> >> (tries >> >>> to >> >>>>>>>>>>>> deal >> >>>>>>>>>>>> with logical fields for TypeComparators), >> TypeSerializer (incl. >> >>> it's >> >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with >> many methods and >> >>>>>>>>>>>> internals such normalized keys etc.). >> >>>>>>>>>>>> >> >>>>>>>>>>>> If necessary, we can add more simple >> comparison-related methods >> >>> to >> >>>>>> the >> >>>>>>>>>>>> TypeSerializer interface itself in the future (like >> >>>>>>>>>>>> TypeSerializer.isDeterministic). >> >>>>>>>>>>>> >> >>>>>>>>>>>> Regards, >> >>>>>>>>>>>> Timo >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote: >> >>>>>>>>>>>>> Thanks for publishing the FLIP! >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz < >> >>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> 1. How to sort/group keys? What >> representation of the >> >> key >> >>>>>>>>>>>>>> should we >> >>>>>>>>>>>>>> use? Should we sort on the binary form >> or should we >> >>> depend >> >>>>>> on >> >>>>>>>>>>>>>> Comparators being available. >> >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to >> do the >> >>>>>>>>>>>>> sorting/grouping >> >>>>>>>>>>>> by using the binary representation. Then my >> opinion switched >> >> and >> >>> I >> >>>>>>>>>>>> thought >> >>>>>>>>>>>> we should use TypeComparator/Comparator because >> that's what the >> >>>>>>>>>>>> DataSet API >> >>>>>>>>>>>> uses. After talking to Stephan, I'm again >> encouraged in my >> >>> opinion >> >>>>>>>>>>>> to use >> >>>>>>>>>>>> the binary representation because it means we >> can eventually >> >> get >> >>> rid >> >>>>>>>>>>>> of the >> >>>>>>>>>>>> TypeComparator interface, which is a bit >> complicated, and >> >>> because we >> >>>>>>>>>>>> don't >> >>>>>>>>>>>> need any good order in our sort, we only need >> the grouping. >> >>>>>>>>>>>>> This comes with some problems, though: we need >> to ensure that >> >>> the >> >>>>>>>>>>>> TypeSerializer of the type we're sorting is >> >> stable/deterministic. >> >>>>>>>>>>>> Beam has >> >>>>>>>>>>>> infrastructure for this in the form of >> >>> Coder.verifyDeterministic() >> >>>>>> [1] >> >>>>>>>>>>>> which we don't have right now and should add if >> we go down this >> >>>>>> path. >> >>>>>>>>>>>>>> 2. Where in the stack should we apply the >> sorting (this >> >>>>>> rather a >> >>>>>>>>>>>>>> discussion about internals) >> >>>>>>>>>>>>> Here, I'm gravitating towards the third option >> of implementing >> >>> it >> >>>>>>>>>>>>> in the >> >>>>>>>>>>>> layer of the StreamTask, which probably means >> implementing a >> >>> custom >> >>>>>>>>>>>> InputProcessor. I think it's best to do it in >> this layer >> >> because >> >>> we >> >>>>>>>>>>>> would >> >>>>>>>>>>>> not mix concerns of different layers as we would >> if we >> >>> implemented >> >>>>>>>>>>>> this as >> >>>>>>>>>>>> a custom StreamOperator. I think this solution >> is also best >> >> when >> >>> it >> >>>>>>>>>>>> comes >> >>>>>>>>>>>> to multi-input operators. >> >>>>>>>>>>>>>> 3. How should we deal with custom >> implementations of >> >>>>>>>>>>>>>> StreamOperators >> >>>>>>>>>>>>> I think the cleanest solution would be to go >> through the >> >>> complete >> >>>>>>>>>>>> operator lifecycle for every key, because then >> the watermark >> >>> would >> >>>>>> not >> >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not >> break the >> >>>>>> semantical >> >>>>>>>>>>>> guarantees that we gave to operators so far, in >> that the >> >>> watermark >> >>>>>> is >> >>>>>>>>>>>> strictly monotonically increasing. However, I >> don't think this >> >>>>>>>>>>>> solution is >> >>>>>>>>>>>> feasible because it would come with too much >> overhead. We >> >> should >> >>>>>>>>>>>> solve this >> >>>>>>>>>>>> problem via documentation and maybe educate >> people to not query >> >>> the >> >>>>>>>>>>>> current >> >>>>>>>>>>>> watermark or not rely on the watermark being >> monotonically >> >>>>>>>>>>>> increasing in >> >>>>>>>>>>>> operator implementations to allow the framework >> more freedoms >> >> in >> >>> how >> >>>>>>>>>>>> user >> >>>>>>>>>>>> programs are executed. >> >>>>>>>>>>>>> Aljoscha >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> [1] >> >> >> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184 >> >>> >>
signature.asc
Description: OpenPGP digital signature