*bq. The problem is that I could not use this "state backend" in a StreamOperator.* Ok, got your point now. I agree that it makes more sense to make StateBackend return a contract instead of a particular implementation. How about we name the new interface as `CheckpointableKeyedStateBackend`? We could make `BoundedStreamStateBackend` implement `CheckpointableKeyedStateBackend` but without checkpoint related operations yet, whereas reserving the possibility that the bounded stream also supports checkpoint in future. What do you think?
*bq. Correct, the goal though is not to outperform the HeapStateBackend. The single key state backend requires sorted inputs which come with a price. The main goal is to outperform RocksDBStateBackend, which is necessary for large states.* Personally I think the main benefit of introducing a bounded stream specific state backend is that we could remove the data after processing a key, thus reducing the cost of state storage a lot, rather than the routine performance of state processing (smile). Thanks. Best Regards, Yu On Fri, 18 Sep 2020 at 20:48, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > =============================================== > > > > > *class BoundedStreamInternalStateBackend<K> implements > KeyedStateBackend<K>, > SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, Closeable, > CheckpointListener {* > =============================================== > > The problem is that I could not use this "state backend" in a > StreamOperator. The goal of this effort is that it is mostly transparent to > all the implementations of StreamOperator(s). Right now StreamOperator > retrieves AbstractKeyedStateBackend through StreamOperatorContext which > instantiates it in StreamTaskInitializer etc. The problem is that a big > chunk of the current code base uses the AbstractKeyedStateBackend, whereas > it really just needs an interface not that particular implementation. The > change is really only about separating the contract > (InternalKeyedStateBackend) from the implementation > (AbstractKeyedStateBackend). My thinking is that it is only an approach to > fix a mistake of the past that StateBackend returns a particular > implementation rather than a contract. > > I do agree I don't need the `SnapshotStrategy` and `CheckpointListener` > interfaces. The thing though is that the runtime expects those contracts > from an AbstractKeyedStateBackend. > > BTW, If you'd like to see how does this change really looks like you can > check the PR I already opened for it: > https://github.com/apache/flink/pull/13405/files > > Checking the FLIP more closely I found below description: "With a high > number of keys it (HeapStateBackend) suffers a significant penalty and > becomes even less performant for that particular case than the sorting > approach", does it mean "HeapStateBackend" outperformed > "SingleKeyStateBackend" when the number of keys is relatively small > > Correct, the goal though is not to outperform the HeapStateBackend. The > single key state backend requires sorted inputs which come with a price. > The main goal is to outperform RocksDBStateBackend, which is necessary for > large states. > > Thanks for the summary. I think it's more specific and could help readers > to better understand why we cannot use HeapKeyedStateBackend directly, than > the single line description "when the StateBackend observes a new incoming > key it will reset all acquired state objects so far". What do you think? > > Sure, I can add it to the document. > > Best, > > Dawid > On 18/09/2020 14:29, Yu Li wrote: > > Thanks for the clarification Dawid. Some of my thoughts: > > *bq. The results are times for end-to-end execution of a job. Therefore > the sorting part is included. The actual target of the replacement is > RocksDB, which does the serialization and key bytes comparison as well.* > I see. Checking the FLIP more closely I found below description: "With a > high number of keys it (HeapStateBackend) suffers a significant penalty and > becomes even less performant for that particular case than the sorting > approach", does it mean "HeapStateBackend" outperformed > "SingleKeyStateBackend" when the number of keys is relatively small? The > micro-benchmark of ValueState removes the key shuffling phase, so its > result could be self-explained. > > About `InternalKeyedStateBackend`, let me rephrase my question: why don't > we add the new state backend like below instead of adding a new interface > (and IMHO there's no need to implement the `SnapshotStrategy` and > `CheckpointListener` interfaces since it doesn't support checkpoint)? > Reserved for adding more internal state backends in future? > =============================================== > > > > > *class BoundedStreamInternalStateBackend<K> implements > KeyedStateBackend<K>, > SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, Closeable, > CheckpointListener {* > =============================================== > > *bq. Let me though quickly summarize and if you find it useful I can add > it to the FLIP itself.* > Thanks for the summary. I think it's more specific and could help readers > to better understand why we cannot use HeapKeyedStateBackend directly, than > the single line description "when the StateBackend observes a new incoming > key it will reset all acquired state objects so far". What do you think? > > Thanks. > > Best Regards, > Yu > > > On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Thanks for the comments Yu. >> >> > First of all, for the performance testing result, I'm wondering whether >> the >> > sorting cost is counted in the result for both DataSet and refined >> > DataStream implementations. I could think of the saving of hash >> computation >> > and final iteration to emit the word-count result (processing a key at a >> > time could save such iteration), but not sure whether these cost savings >> > are at the same grade of comparing the key bytes. >> The results are times for end-to-end execution of a job. Therefore the >> sorting part is included. The actual target of the replacement is >> RocksDB, which does the serialization and key bytes comparison as well. >> On top of that it adds all the RocksDB bookkeeping. >> >> > However, I'm not fully convinced to introduce a new >> > `InternalKeyedStateBackend` interface. I agree that we don't need to >> take >> > the overhead of `AbstractKeyedStateBackend` since we don't plan to >> support >> > checkpoint for now, but why don't we directly write a state backend >> > implementation for bounded stream? Or are we planning to introduce more >> > internal state backends in future? What's more, the current design of >> > `InternalKeyedStateBackend` in the FLIP document seems to be extending >> as >> > many interfaces as `AbstractedKeyedStateBackend` implements, which I >> guess >> > is a typo. >> Maybe I was not clear enough about the change. This change does not >> "strip" the AbstractKeyedStateBackend of any functionalities. My intent >> is not to remove any methods of the AbstractKeyedStateBackend. The >> problem here is that the AbstractKeyedStateBackend is an abstract class >> (duh ;)), which does have some predefined implementation. Moreover it >> requires objects such as InternalKeyContex, CloseableRegistry etc. to be >> constructed, which we don't need/want e.g. in the single key state >> backend. My intention here is to make the StateBackend return only pure >> interfaces. (AbstractKeyedStateBackend is the only non-interface that >> StateBackend returns). In other words I just want to make >> AbstractKeyedStateBackend a proper interface. It is not a typo that >> InternalKeyedStateBackend extends the same interfaces as >> AbstractKeyedStateBackend does. >> >> > Thirdly, I suggest we name the special state backend as >> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of >> > `StateBackend` it actually cannot be called a complete state >> backend...: "A >> > State Backend defines how the state of a streaming application is stored >> > and checkpointed". >> Thanks for the suggestion. Sure I can use that name. Yes I do agree it >> is not a full fledged StateBackend. I do want it to be an internal >> class, that is never used explicitly by users. >> >> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" >> in >> > the FLIP, >> I did not put it into the design, because 1) I found it internal. It >> does not touch any public facing interfaces. 2) It is rather >> straightforward. Let me though quickly summarize and if you find it >> useful I can add it to the FLIP itself. >> >> > as how to detect the key switching >> That is rather straightforwad. The state backend works only with the >> assumption that the keys are sorted/grouped together. We keep the >> current key and in the setCurrentKey we check if the new key is >> different then the current one. Side note: yes, custom user operators >> which call setCurrentKey explicitly might not work in this setup. >> >> > remove the data (especially in the non-windowing >> > case), etc. >> We only ever keep a single value for a state object. Therefore >> ValueState is a very thin wrapper for a value, MapState for a HashMap, >> ListState for a List etc. When the key changes we simply set the wrapped >> value/map/state to null. >> >> I hope this clarifies a few things. Let me know if you have any questions. >> >> Best, >> >> Dawid >> >> On 17/09/2020 15:28, Yu Li wrote: >> > Hi all, >> > >> > Sorry for being late to the discussion, but I just noticed there are >> some >> > state backend related changes proposed in this FLIP, so would like to >> share >> > my two cents. >> > >> > First of all, for the performance testing result, I'm wondering whether >> the >> > sorting cost is counted in the result for both DataSet and refined >> > DataStream implementations. I could think of the saving of hash >> computation >> > and final iteration to emit the word-count result (processing a key at a >> > time could save such iteration), but not sure whether these cost savings >> > are at the same grade of comparing the key bytes. >> > >> > Regardless of the performance result, I agree that the capability of >> > removing the data after processing a key could prominently reduce the >> space >> > required by state, so introducing a new state backend for bounded stream >> > makes sense. >> > >> > However, I'm not fully convinced to introduce a new >> > `InternalKeyedStateBackend` interface. I agree that we don't need to >> take >> > the overhead of `AbstractKeyedStateBackend` since we don't plan to >> support >> > checkpoint for now, but why don't we directly write a state backend >> > implementation for bounded stream? Or are we planning to introduce more >> > internal state backends in future? What's more, the current design of >> > `InternalKeyedStateBackend` in the FLIP document seems to be extending >> as >> > many interfaces as `AbstractedKeyedStateBackend` implements, which I >> guess >> > is a typo. >> > >> > Thirdly, I suggest we name the special state backend as >> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of >> > `StateBackend` it actually cannot be called a complete state >> backend...: "A >> > State Backend defines how the state of a streaming application is stored >> > and checkpointed". >> > >> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" >> in >> > the FLIP, and suggest we write the key design down, such as how to >> detect >> > the key switching and remove the data (especially in the non-windowing >> > case), etc. >> > >> > Thanks. >> > >> > Best Regards, >> > Yu >> > >> > >> > On Wed, 9 Sep 2020 at 17:18, Kurt Young <ykt...@gmail.com> wrote: >> > >> >> Yes, I didn't intend to block this FLIP, and some of the comments are >> >> actually implementation details. >> >> And all of them are handled internally, not visible to users, thus we >> can >> >> also change or improve them >> >> in the future. >> >> >> >> Best, >> >> Kurt >> >> >> >> >> >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <aljos...@apache.org> >> >> wrote: >> >> >> >>> I think Kurts concerns/comments are very valid and we need to >> implement >> >>> such things in the future. However, I also think that we need to get >> >>> started somewhere and I think what's proposed in this FLIP is a good >> >>> starting point that we can build on. So we should not get paralyzed by >> >>> thinking too far ahead into the future. Does that make sense? >> >>> >> >>> Best, >> >>> Aljoscha >> >>> >> >>> On 08.09.20 16:59, Dawid Wysakowicz wrote: >> >>>> Ad. 1 >> >>>> >> >>>> Yes, you are right in principle. >> >>>> >> >>>> Let me though clarify my proposal a bit. The proposed sort-style >> >>>> execution aims at a generic KeyedProcessFunction were all the >> >>>> "aggregations" are actually performed in the user code. It tries to >> >>>> improve the performance by actually removing the need to use RocksDB >> >>> e.g.: >> >>>> private static final class Summer<K> >> >>>> extends KeyedProcessFunction<K, Tuple2<K, Integer>, >> >>>> Tuple2<K, Integer>> { >> >>>> >> >>>> .... >> >>>> >> >>>> @Override >> >>>> public void processElement( >> >>>> Tuple2<K, Integer> value, >> >>>> Context ctx, >> >>>> Collector<Tuple2<K, Integer>> out) throws Exception >> { >> >>>> if (!Objects.equals(timerRegistered.value(), >> >> Boolean.TRUE)) >> >>> { >> >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); >> >>>> timerRegistered.update(true); >> >>>> } >> >>>> Integer v = counter.value(); >> >>>> Integer incomingValue = value.f1; >> >>>> if (v != null) { >> >>>> v += incomingValue; >> >>>> } else { >> >>>> v = incomingValue; >> >>>> } >> >>>> counter.update(v); >> >>>> } >> >>>> >> >>>> .... >> >>>> >> >>>> } >> >>>> >> >>>> Therefore I don't think the first part of your reply with separating >> >> the >> >>>> write and read workload applies here. We do not aim to create a >> >>>> competing API with the Table API. We think operations such as joins >> or >> >>>> analytical aggregations should be performed in Table API. >> >>>> >> >>>> As for the second part I agree it would be nice to fall back to the >> >>>> sorting approach only if a certain threshold of memory in a State >> >>>> Backend is used. This has some problems though. We would need a way >> to >> >>>> estimate the size of the occupied memory to tell when the threshold >> is >> >>>> reached. That is not easily doable by default e.g. in a >> >>>> MemoryStateBackend, as we do not serialize the values in the state >> >>>> backend by default. We would have to add that, but this would add the >> >>>> overhead of the serialization. >> >>>> >> >>>> This proposal aims at the cases where we do have a large state that >> >> will >> >>>> not fit into the memory and without the change users are forced to >> use >> >>>> RocksDB. If the state fits in memory I agree it will be better to do >> >>>> hash-based aggregations e.g. using the MemoryStateBackend. Therefore >> I >> >>>> think it is important to give users the choice to use one or the >> other >> >>>> approach. We might discuss which approach should be the default for >> >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with >> >>>> user configured state backend or sorting-based with a single key at a >> >>>> time backend. Moreover we could think if we should let users choose >> the >> >>>> sort vs hash "state backend" per operator. Would that suffice? >> >>>> >> >>>> Ad. 2 >> >>>> >> >>>> I still think we can just use the first X bytes of the serialized >> form >> >>>> as the normalized key and fallback to comparing full keys on clashes. >> >> It >> >>>> is because we are actually not interested in a logical order, but we >> >>>> care only about the "grouping" aspect of the sorting. Therefore I >> think >> >>>> its enough to compare only parts of the full key as the normalized >> key. >> >>>> >> >>>> Thanks again for the really nice and thorough feedback! >> >>>> >> >>>> Best, >> >>>> >> >>>> Dawid >> >>>> >> >>>> On 08/09/2020 14:47, Kurt Young wrote: >> >>>>> Regarding #1, yes the state backend is definitely hash-based >> >> execution. >> >>>>> However there are some differences between >> >>>>> batch hash-based execution. The key difference is *random access & >> >>>>> read/write mixed workload". For example, by using >> >>>>> state backend in streaming execution, one have to mix the read and >> >> write >> >>>>> operations and all of them are actually random >> >>>>> access. But in a batch hash execution, we could divide the phases >> into >> >>>>> write and read. For example, we can build the >> >>>>> hash table first, with only write operations. And once the build is >> >>> done, >> >>>>> we can start to read and trigger the user codes. >> >>>>> Take hash aggregation which blink planner implemented as an example, >> >>> during >> >>>>> building phase, as long as the hash map >> >>>>> could fit into memory, we will update the accumulators directly in >> the >> >>> hash >> >>>>> map. And once we are running out of memory, >> >>>>> we then fall back to sort based execution. It improves the >> >> performance a >> >>>>> lot if the incoming data can be processed in >> >>>>> memory. >> >>>>> >> >>>>> Regarding #2, IIUC you are actually describing a binary format of >> key, >> >>> not >> >>>>> normalized key which is used in DataSet. I will >> >>>>> take String for example. If we have lots of keys with length all >> >> greater >> >>>>> than, let's say 20. In your proposal, you will encode >> >>>>> the whole string in the prefix of your composed data ( <key> + >> >>> <timestamp> >> >>>>> + <record> ). And when you compare >> >>>>> records, you will actually compare the *whole* key of the record. >> For >> >>>>> normalized key, it's fixed-length in this case, IIRC it will >> >>>>> take 8 bytes to represent the string. And the sorter will store the >> >>>>> normalized key and offset in a dedicated array. When doing >> >>>>> the sorting, it only sorts this *small* array. If the normalized >> keys >> >>> are >> >>>>> different, you could immediately tell which is greater from >> >>>>> normalized keys. You only have to compare the full keys if the >> >>> normalized >> >>>>> keys are equal and you know in this case the normalized >> >>>>> key couldn't represent the full key. The reason why Dataset is doing >> >>> this >> >>>>> is it's super cache efficient by sorting the *small* array. >> >>>>> The idea is borrowed from this paper [1]. Let me know if I missed or >> >>>>> misunderstood anything. >> >>>>> >> >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a >> >>>>> cache-sensitive parallel external sort) >> >>>>> >> >>>>> Best, >> >>>>> Kurt >> >>>>> >> >>>>> >> >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz < >> >> dwysakow...@apache.org >> >>>>> wrote: >> >>>>> >> >>>>>> Hey Kurt, >> >>>>>> >> >>>>>> Thank you for comments! >> >>>>>> >> >>>>>> Ad. 1 I might have missed something here, but as far as I see it is >> >>> that >> >>>>>> using the current execution stack with regular state backends >> >> (RocksDB >> >>>>>> in particular if we want to have spilling capabilities) is >> equivalent >> >>> to >> >>>>>> hash-based execution. I can see a different spilling state backend >> >>>>>> implementation in the future, but I think it is not batch specifc. >> Or >> >>> am >> >>>>>> I missing something? >> >>>>>> >> >>>>>> Ad. 2 Totally agree that normalized keys are important to the >> >>>>>> performance. I think though TypeComparators are not a necessity to >> >> have >> >>>>>> that. Actually this proposal is heading towards only ever >> performing >> >>>>>> "normalized keys" comparison. I have not included in the proposal >> the >> >>>>>> binary format which we will use for sorting (partially because I >> >>> forgot, >> >>>>>> and partially because I thought it was too much of an >> implementation >> >>>>>> detail). Let me include it here though, as it might clear the >> >> situation >> >>>>>> a bit here. >> >>>>>> >> >>>>>> In DataSet, at times we have KeySelectors which extract keys based >> on >> >>>>>> field indices or names. This allows in certain situation to extract >> >> the >> >>>>>> key from serialized records. Compared to DataSet, in DataStream, >> the >> >>> key >> >>>>>> is always described with a black-box KeySelector, or differently >> >> with a >> >>>>>> function which extracts a key from a deserialized record. In turn >> >>> there >> >>>>>> is no way to create a comparator that could compare records by >> >>>>>> extracting the key from a serialized record (neither with, nor >> >> without >> >>>>>> key normalization). We suggest that the input for the sorter will >> be >> >>>>>> >> >>>>>> <key> + <timestamp> + <record> >> >>>>>> >> >>>>>> Without having the key prepended we would have to deserialize the >> >>> record >> >>>>>> for every key comparison. >> >>>>>> >> >>>>>> Therefore if we agree that we perform binary comparison for keys >> >> (which >> >>>>>> are always prepended), it is actually equivalent to a DataSet with >> >>>>>> TypeComparators that support key normalization. >> >>>>>> >> >>>>>> Let me know if that is clear, or I have missed something here. >> >>>>>> >> >>>>>> Best, >> >>>>>> >> >>>>>> Dawid >> >>>>>> >> >>>>>> On 08/09/2020 03:39, Kurt Young wrote: >> >>>>>>> Hi Dawid, thanks for bringing this up, it's really exciting to see >> >>> that >> >>>>>>> batch execution is introduced in DataStream. From the flip, it >> seems >> >>>>>>> we are sticking with sort based execution mode (at least for now), >> >>> which >> >>>>>>> will sort the whole input data before any *keyed* operation is >> >>>>>>> executed. I have two comments here: >> >>>>>>> >> >>>>>>> 1. Do we want to introduce hash-based execution in the future? >> Sort >> >>> is a >> >>>>>>> safe choice but not the best in lots of cases. IIUC we only need >> >>>>>>> to make sure that before the framework finishes dealing with one >> >> key, >> >>> the >> >>>>>>> operator doesn't see any data belonging to other keys, thus >> >>>>>>> hash-based execution would also do the trick. Oon tricky thing the >> >>>>>>> framework might need to deal with is memory constraint and >> spilling >> >>>>>>> in the hash map, but Flink also has some good knowledge about >> these >> >>>>>> stuff. >> >>>>>>> 2. Going back to sort-based execution and how to sort keys. From >> my >> >>>>>>> experience, the performance of sorting would be one the most >> >> important >> >>>>>>> things if we want to achieve good performance of batch execution. >> >> And >> >>>>>>> normalized keys are actually the key of the performance of >> sorting. >> >>>>>>> If we want to get rid of TypeComparator, I think we still need to >> >>> find a >> >>>>>>> way to introduce this back. >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Kurt >> >>>>>>> >> >>>>>>> >> >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek < >> >> aljos...@apache.org> >> >>>>>> wrote: >> >>>>>>>> Yes, I think we can address the problem of indeterminacy in a >> >>> separate >> >>>>>>>> FLIP because we're already in it. >> >>>>>>>> >> >>>>>>>> Aljoscha >> >>>>>>>> >> >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote: >> >>>>>>>>> @Seth That's a very good point. I agree that RocksDB has the >> same >> >>>>>>>>> problem. I think we can use the same approach for the sorted >> >>> shuffles >> >>>>>>>>> then. @Aljoscha I agree we should think about making it more >> >>> resilient, >> >>>>>>>>> as I guess users might have problems already if they use keys >> with >> >>>>>>>>> non-deterministic binary representation. How do you feel about >> >>>>>>>>> addressing that separately purely to limit the scope of this >> FLIP? >> >>>>>>>>> >> >>>>>>>>> @Aljoscha I tend to agree with you that the best place to >> actually >> >>>>>> place >> >>>>>>>>> the sorting would be in the InputProcessor(s). If there are no >> >> more >> >>>>>>>>> suggestions in respect to that issue. I'll put this proposal for >> >>>>>> voting. >> >>>>>>>>> @all Thank you for the feedback so far. I'd like to start a >> voting >> >>>>>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you >> >>>>>> comment >> >>>>>>>>> before that, if you still have some outstanding ideas. >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> >> >>>>>>>>> Dawid >> >>>>>>>>> >> >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote: >> >>>>>>>>>> Seth is right, I was just about to write that as well. There >> is a >> >>>>>>>>>> problem, though, because some of our TypeSerializers are not >> >>>>>>>>>> deterministic even though we use them as if they were. Beam >> >>> excludes >> >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in certain >> cases. >> >>> I'm >> >>>>>>>>>> pretty sure there is also weirdness going on in our >> >> KryoSerializer. >> >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote: >> >>>>>>>>>>> There is already an implicit assumption the TypeSerializer for >> >>> keys >> >>>>>> is >> >>>>>>>>>>> stable/deterministic, RocksDB compares keys using their >> >> serialized >> >>>>>> byte >> >>>>>>>>>>> strings. I think this is a non-issue (or at least it's not >> >>> changing >> >>>>>> the >> >>>>>>>>>>> status quo). >> >>>>>>>>>>> >> >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther < >> twal...@apache.org >> >>>>>>>> wrote: >> >>>>>>>>>>>> +1 for getting rid of the TypeComparator interface and rely >> on >> >>> the >> >>>>>>>>>>>> serialized representation for grouping. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Adding a new type to DataStream API is quite difficult at the >> >>> moment >> >>>>>>>>>>>> due >> >>>>>>>>>>>> to too many components that are required: TypeInformation >> >> (tries >> >>> to >> >>>>>>>>>>>> deal >> >>>>>>>>>>>> with logical fields for TypeComparators), TypeSerializer >> (incl. >> >>> it's >> >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with many methods >> and >> >>>>>>>>>>>> internals such normalized keys etc.). >> >>>>>>>>>>>> >> >>>>>>>>>>>> If necessary, we can add more simple comparison-related >> methods >> >>> to >> >>>>>> the >> >>>>>>>>>>>> TypeSerializer interface itself in the future (like >> >>>>>>>>>>>> TypeSerializer.isDeterministic). >> >>>>>>>>>>>> >> >>>>>>>>>>>> Regards, >> >>>>>>>>>>>> Timo >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote: >> >>>>>>>>>>>>> Thanks for publishing the FLIP! >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz < >> >>> dwysakow...@apache.org> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> 1. How to sort/group keys? What representation of the >> >> key >> >>>>>>>>>>>>>> should we >> >>>>>>>>>>>>>> use? Should we sort on the binary form or should we >> >>> depend >> >>>>>> on >> >>>>>>>>>>>>>> Comparators being available. >> >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to do the >> >>>>>>>>>>>>> sorting/grouping >> >>>>>>>>>>>> by using the binary representation. Then my opinion switched >> >> and >> >>> I >> >>>>>>>>>>>> thought >> >>>>>>>>>>>> we should use TypeComparator/Comparator because that's what >> the >> >>>>>>>>>>>> DataSet API >> >>>>>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my >> >>> opinion >> >>>>>>>>>>>> to use >> >>>>>>>>>>>> the binary representation because it means we can eventually >> >> get >> >>> rid >> >>>>>>>>>>>> of the >> >>>>>>>>>>>> TypeComparator interface, which is a bit complicated, and >> >>> because we >> >>>>>>>>>>>> don't >> >>>>>>>>>>>> need any good order in our sort, we only need the grouping. >> >>>>>>>>>>>>> This comes with some problems, though: we need to ensure >> that >> >>> the >> >>>>>>>>>>>> TypeSerializer of the type we're sorting is >> >> stable/deterministic. >> >>>>>>>>>>>> Beam has >> >>>>>>>>>>>> infrastructure for this in the form of >> >>> Coder.verifyDeterministic() >> >>>>>> [1] >> >>>>>>>>>>>> which we don't have right now and should add if we go down >> this >> >>>>>> path. >> >>>>>>>>>>>>>> 2. Where in the stack should we apply the sorting >> (this >> >>>>>> rather a >> >>>>>>>>>>>>>> discussion about internals) >> >>>>>>>>>>>>> Here, I'm gravitating towards the third option of >> implementing >> >>> it >> >>>>>>>>>>>>> in the >> >>>>>>>>>>>> layer of the StreamTask, which probably means implementing a >> >>> custom >> >>>>>>>>>>>> InputProcessor. I think it's best to do it in this layer >> >> because >> >>> we >> >>>>>>>>>>>> would >> >>>>>>>>>>>> not mix concerns of different layers as we would if we >> >>> implemented >> >>>>>>>>>>>> this as >> >>>>>>>>>>>> a custom StreamOperator. I think this solution is also best >> >> when >> >>> it >> >>>>>>>>>>>> comes >> >>>>>>>>>>>> to multi-input operators. >> >>>>>>>>>>>>>> 3. How should we deal with custom implementations of >> >>>>>>>>>>>>>> StreamOperators >> >>>>>>>>>>>>> I think the cleanest solution would be to go through the >> >>> complete >> >>>>>>>>>>>> operator lifecycle for every key, because then the watermark >> >>> would >> >>>>>> not >> >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not break the >> >>>>>> semantical >> >>>>>>>>>>>> guarantees that we gave to operators so far, in that the >> >>> watermark >> >>>>>> is >> >>>>>>>>>>>> strictly monotonically increasing. However, I don't think >> this >> >>>>>>>>>>>> solution is >> >>>>>>>>>>>> feasible because it would come with too much overhead. We >> >> should >> >>>>>>>>>>>> solve this >> >>>>>>>>>>>> problem via documentation and maybe educate people to not >> query >> >>> the >> >>>>>>>>>>>> current >> >>>>>>>>>>>> watermark or not rely on the watermark being monotonically >> >>>>>>>>>>>> increasing in >> >>>>>>>>>>>> operator implementations to allow the framework more freedoms >> >> in >> >>> how >> >>>>>>>>>>>> user >> >>>>>>>>>>>> programs are executed. >> >>>>>>>>>>>>> Aljoscha >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> [1] >> >> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184 >> >>> >> >>