*bq. The problem is that I could not use this "state backend" in a
StreamOperator.*
Ok, got your point now. I agree that it makes more sense to
make StateBackend return a contract instead of a particular implementation.
How about we name the new interface as `CheckpointableKeyedStateBackend`?
We could make `BoundedStreamStateBackend` implement
`CheckpointableKeyedStateBackend` but without checkpoint related operations
yet, whereas reserving the possibility that the bounded stream also
supports checkpoint in future. What do you think?

*bq. Correct, the goal though is not to outperform the HeapStateBackend.
The single key state backend requires sorted inputs which come with a
price. The main goal is to outperform RocksDBStateBackend, which is
necessary for large states.*
Personally I think the main benefit of introducing a bounded stream
specific state backend is that we could remove the data after processing a
key, thus reducing the cost of state storage a lot, rather than the routine
performance of state processing (smile).

Thanks.

Best Regards,
Yu


On Fri, 18 Sep 2020 at 20:48, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> ===============================================
>
>
>
>
> *class BoundedStreamInternalStateBackend<K> implements
> KeyedStateBackend<K>,
> SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,         Closeable,
>     CheckpointListener {*
> ===============================================
>
> The problem is that I could not use this "state backend" in a
> StreamOperator. The goal of this effort is that it is mostly transparent to
> all the implementations of StreamOperator(s). Right now StreamOperator
> retrieves AbstractKeyedStateBackend through StreamOperatorContext which
> instantiates it in StreamTaskInitializer etc. The problem is that a big
> chunk of the current code base uses the AbstractKeyedStateBackend, whereas
> it really just needs an interface not that particular implementation. The
> change is really only about separating the contract
> (InternalKeyedStateBackend) from the implementation
> (AbstractKeyedStateBackend). My thinking is that it is only an approach to
> fix a mistake of the past that StateBackend returns a particular
> implementation rather than a contract.
>
> I do agree I don't need the `SnapshotStrategy` and `CheckpointListener`
> interfaces. The thing though is that the runtime expects those contracts
> from an AbstractKeyedStateBackend.
>
> BTW, If you'd like to see how does this change really looks like you can
> check the PR I already opened for it:
> https://github.com/apache/flink/pull/13405/files
>
> Checking the FLIP more closely I found below description: "With a high
> number of keys it (HeapStateBackend) suffers a significant penalty and
> becomes even less performant for that particular case than the sorting
> approach", does it mean "HeapStateBackend" outperformed
> "SingleKeyStateBackend" when the number of keys is relatively small
>
> Correct, the goal though is not to outperform the HeapStateBackend. The
> single key state backend requires sorted inputs which come with a price.
> The main goal is to outperform RocksDBStateBackend, which is necessary for
> large states.
>
> Thanks for the summary. I think it's more specific and could help readers
> to better understand why we cannot use HeapKeyedStateBackend directly, than
> the single line description "when the StateBackend observes a new incoming
> key it will reset all acquired state objects so far". What do you think?
>
> Sure, I can add it to the document.
>
> Best,
>
> Dawid
> On 18/09/2020 14:29, Yu Li wrote:
>
> Thanks for the clarification Dawid. Some of my thoughts:
>
> *bq. The results are times for end-to-end execution of a job. Therefore
> the sorting part is included. The actual target of the replacement is
> RocksDB, which does the serialization and key bytes comparison as well.*
> I see. Checking the FLIP more closely I found below description: "With a
> high number of keys it (HeapStateBackend) suffers a significant penalty and
> becomes even less performant for that particular case than the sorting
> approach", does it mean "HeapStateBackend" outperformed
> "SingleKeyStateBackend" when the number of keys is relatively small? The
> micro-benchmark of ValueState removes the key shuffling phase, so its
> result could be self-explained.
>
> About `InternalKeyedStateBackend`, let me rephrase my question: why don't
> we add the new state backend like below instead of adding a new interface
> (and IMHO there's no need to implement the `SnapshotStrategy` and
> `CheckpointListener` interfaces since it doesn't support checkpoint)?
> Reserved for adding more internal state backends in future?
> ===============================================
>
>
>
>
> *class BoundedStreamInternalStateBackend<K> implements
> KeyedStateBackend<K>,
> SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,         Closeable,
>     CheckpointListener {*
> ===============================================
>
> *bq. Let me though quickly summarize and if you find it useful I can add
> it to the FLIP itself.*
> Thanks for the summary. I think it's more specific and could help readers
> to better understand why we cannot use HeapKeyedStateBackend directly, than
> the single line description "when the StateBackend observes a new incoming
> key it will reset all acquired state objects so far". What do you think?
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Thanks for the comments Yu.
>>
>> > First of all, for the performance testing result, I'm wondering whether
>> the
>> > sorting cost is counted in the result for both DataSet and refined
>> > DataStream implementations. I could think of the saving of hash
>> computation
>> > and final iteration to emit the word-count result (processing a key at a
>> > time could save such iteration), but not sure whether these cost savings
>> > are at the same grade of comparing the key bytes.
>> The results are times for end-to-end execution of a job. Therefore the
>> sorting part is included. The actual target of the replacement is
>> RocksDB, which does the serialization and key bytes comparison as well.
>> On top of that it adds all the RocksDB bookkeeping.
>>
>> > However, I'm not fully convinced to introduce a new
>> > `InternalKeyedStateBackend` interface. I agree that we don't need to
>> take
>> > the overhead of `AbstractKeyedStateBackend` since we don't plan to
>> support
>> > checkpoint for now, but why don't we directly write a state backend
>> > implementation for bounded stream? Or are we planning to introduce more
>> > internal state backends in future? What's more, the current design of
>> > `InternalKeyedStateBackend` in the FLIP document seems to be extending
>> as
>> > many interfaces as `AbstractedKeyedStateBackend` implements, which I
>> guess
>> > is a typo.
>> Maybe I was not clear enough about the change. This change does not
>> "strip" the AbstractKeyedStateBackend of any functionalities. My intent
>> is not to remove any methods of the AbstractKeyedStateBackend. The
>> problem here is that the AbstractKeyedStateBackend is an abstract class
>> (duh ;)), which does have some predefined implementation. Moreover it
>> requires objects such as InternalKeyContex, CloseableRegistry etc. to be
>> constructed, which we don't need/want e.g. in the single key state
>> backend. My intention here is to make the StateBackend return only pure
>> interfaces. (AbstractKeyedStateBackend is the only non-interface that
>> StateBackend returns). In other words I just want to make
>> AbstractKeyedStateBackend a proper interface. It is not a typo that
>> InternalKeyedStateBackend extends the same interfaces as
>> AbstractKeyedStateBackend does.
>>
>> > Thirdly, I suggest we name the special state backend as
>> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of
>> > `StateBackend` it actually cannot be called a complete state
>> backend...: "A
>> > State Backend defines how the state of a streaming application is stored
>> > and checkpointed".
>> Thanks for the suggestion. Sure I can use that name. Yes I do agree it
>> is not a full fledged StateBackend. I do want it to be an internal
>> class, that is never used explicitly by users.
>>
>> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend"
>> in
>> > the FLIP,
>> I did not put it into the design, because 1) I found it internal. It
>> does not touch any public facing interfaces. 2) It is rather
>> straightforward. Let me though quickly summarize and if you find it
>> useful I can add it to the FLIP itself.
>>
>> > as how to detect the key switching
>> That is rather straightforwad. The state backend works only with the
>> assumption that the keys are sorted/grouped together. We keep the
>> current key and in the setCurrentKey we check if the new key is
>> different then the current one. Side note: yes, custom user operators
>> which call setCurrentKey explicitly might not work in this setup.
>>
>> > remove the data (especially in the non-windowing
>> > case), etc.
>> We only ever keep a single value for a state object. Therefore
>> ValueState is a very thin wrapper for a value, MapState for a HashMap,
>> ListState for a List etc. When the key changes we simply set the wrapped
>> value/map/state to null.
>>
>> I hope this clarifies a few things. Let me know if you have any questions.
>>
>> Best,
>>
>> Dawid
>>
>> On 17/09/2020 15:28, Yu Li wrote:
>> > Hi all,
>> >
>> > Sorry for being late to the discussion, but I just noticed there are
>> some
>> > state backend related changes proposed in this FLIP, so would like to
>> share
>> > my two cents.
>> >
>> > First of all, for the performance testing result, I'm wondering whether
>> the
>> > sorting cost is counted in the result for both DataSet and refined
>> > DataStream implementations. I could think of the saving of hash
>> computation
>> > and final iteration to emit the word-count result (processing a key at a
>> > time could save such iteration), but not sure whether these cost savings
>> > are at the same grade of comparing the key bytes.
>> >
>> > Regardless of the performance result, I agree that the capability of
>> > removing the data after processing a key could prominently reduce the
>> space
>> > required by state, so introducing a new state backend for bounded stream
>> > makes sense.
>> >
>> > However, I'm not fully convinced to introduce a new
>> > `InternalKeyedStateBackend` interface. I agree that we don't need to
>> take
>> > the overhead of `AbstractKeyedStateBackend` since we don't plan to
>> support
>> > checkpoint for now, but why don't we directly write a state backend
>> > implementation for bounded stream? Or are we planning to introduce more
>> > internal state backends in future? What's more, the current design of
>> > `InternalKeyedStateBackend` in the FLIP document seems to be extending
>> as
>> > many interfaces as `AbstractedKeyedStateBackend` implements, which I
>> guess
>> > is a typo.
>> >
>> > Thirdly, I suggest we name the special state backend as
>> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of
>> > `StateBackend` it actually cannot be called a complete state
>> backend...: "A
>> > State Backend defines how the state of a streaming application is stored
>> > and checkpointed".
>> >
>> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend"
>> in
>> > the FLIP, and suggest we write the key design down, such as how to
>> detect
>> > the key switching and remove the data (especially in the non-windowing
>> > case), etc.
>> >
>> > Thanks.
>> >
>> > Best Regards,
>> > Yu
>> >
>> >
>> > On Wed, 9 Sep 2020 at 17:18, Kurt Young <ykt...@gmail.com> wrote:
>> >
>> >> Yes, I didn't intend to block this FLIP, and some of the comments are
>> >> actually implementation details.
>> >> And all of them are handled internally, not visible to users, thus we
>> can
>> >> also change or improve them
>> >> in the future.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <aljos...@apache.org>
>> >> wrote:
>> >>
>> >>> I think Kurts concerns/comments are very valid and we need to
>> implement
>> >>> such things in the future. However, I also think that we need to get
>> >>> started somewhere and I think what's proposed in this FLIP is a good
>> >>> starting point that we can build on. So we should not get paralyzed by
>> >>> thinking too far ahead into the future. Does that make sense?
>> >>>
>> >>> Best,
>> >>> Aljoscha
>> >>>
>> >>> On 08.09.20 16:59, Dawid Wysakowicz wrote:
>> >>>> Ad. 1
>> >>>>
>> >>>> Yes, you are right in principle.
>> >>>>
>> >>>> Let me though clarify my proposal a bit. The proposed sort-style
>> >>>> execution aims at a generic KeyedProcessFunction were all the
>> >>>> "aggregations" are actually performed in the user code. It tries to
>> >>>> improve the performance by actually removing the need to use RocksDB
>> >>> e.g.:
>> >>>>      private static final class Summer<K>
>> >>>>              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
>> >>>> Tuple2<K, Integer>> {
>> >>>>
>> >>>>          ....
>> >>>>
>> >>>>          @Override
>> >>>>          public void processElement(
>> >>>>                  Tuple2<K, Integer> value,
>> >>>>                  Context ctx,
>> >>>>                  Collector<Tuple2<K, Integer>> out) throws Exception
>> {
>> >>>>              if (!Objects.equals(timerRegistered.value(),
>> >> Boolean.TRUE))
>> >>> {
>> >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>> >>>>                  timerRegistered.update(true);
>> >>>>              }
>> >>>>              Integer v = counter.value();
>> >>>>              Integer incomingValue = value.f1;
>> >>>>              if (v != null) {
>> >>>>                  v += incomingValue;
>> >>>>              } else {
>> >>>>                  v = incomingValue;
>> >>>>              }
>> >>>>              counter.update(v);
>> >>>>          }
>> >>>>
>> >>>>          ....
>> >>>>
>> >>>>     }
>> >>>>
>> >>>> Therefore I don't think the first part of your reply with separating
>> >> the
>> >>>> write and read workload applies here. We do not aim to create a
>> >>>> competing API with the Table API. We think operations such as joins
>> or
>> >>>> analytical aggregations should be performed in Table API.
>> >>>>
>> >>>> As for the second part I agree it would be nice to fall back to the
>> >>>> sorting approach only if a certain threshold of memory in a State
>> >>>> Backend is used. This has some problems though. We would need a way
>> to
>> >>>> estimate the size of the occupied memory to tell when the threshold
>> is
>> >>>> reached. That is not easily doable by default e.g. in a
>> >>>> MemoryStateBackend, as we do not serialize the values in the state
>> >>>> backend by default. We would have to add that, but this would add the
>> >>>> overhead of the serialization.
>> >>>>
>> >>>> This proposal aims at the cases where we do have a large state that
>> >> will
>> >>>> not fit into the memory and without the change users are forced to
>> use
>> >>>> RocksDB. If the state fits in memory I agree it will be better to do
>> >>>> hash-based aggregations e.g. using the MemoryStateBackend. Therefore
>> I
>> >>>> think it is important to give users the choice to use one or the
>> other
>> >>>> approach. We might discuss which approach should be the default for
>> >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
>> >>>> user configured state backend or sorting-based with a single key at a
>> >>>> time backend. Moreover we could think if we should let users choose
>> the
>> >>>> sort vs hash "state backend" per operator. Would that suffice?
>> >>>>
>> >>>> Ad. 2
>> >>>>
>> >>>> I still think we can just use the first X bytes of the serialized
>> form
>> >>>> as the normalized key and fallback to comparing full keys on clashes.
>> >> It
>> >>>> is because we are actually not interested in a logical order, but we
>> >>>> care only about the "grouping" aspect of the sorting. Therefore I
>> think
>> >>>> its enough to compare only parts of the full key as the normalized
>> key.
>> >>>>
>> >>>> Thanks again for the really nice and thorough feedback!
>> >>>>
>> >>>> Best,
>> >>>>
>> >>>> Dawid
>> >>>>
>> >>>> On 08/09/2020 14:47, Kurt Young wrote:
>> >>>>> Regarding #1, yes the state backend is definitely hash-based
>> >> execution.
>> >>>>> However there are some differences between
>> >>>>> batch hash-based execution. The key difference is *random access &
>> >>>>> read/write mixed workload". For example, by using
>> >>>>> state backend in streaming execution, one have to mix the read and
>> >> write
>> >>>>> operations and all of them are actually random
>> >>>>> access. But in a batch hash execution, we could divide the phases
>> into
>> >>>>> write and read. For example, we can build the
>> >>>>> hash table first, with only write operations. And once the build is
>> >>> done,
>> >>>>> we can start to read and trigger the user codes.
>> >>>>> Take hash aggregation which blink planner implemented as an example,
>> >>> during
>> >>>>> building phase, as long as the hash map
>> >>>>> could fit into memory, we will update the accumulators directly in
>> the
>> >>> hash
>> >>>>> map. And once we are running out of memory,
>> >>>>> we then fall back to sort based execution. It improves the
>> >> performance a
>> >>>>> lot if the incoming data can be processed in
>> >>>>> memory.
>> >>>>>
>> >>>>> Regarding #2, IIUC you are actually describing a binary format of
>> key,
>> >>> not
>> >>>>> normalized key which is used in DataSet. I will
>> >>>>> take String for example. If we have lots of keys with length all
>> >> greater
>> >>>>> than, let's say 20. In your proposal, you will encode
>> >>>>> the whole string in the prefix of your composed data ( <key> +
>> >>> <timestamp>
>> >>>>> + <record> ). And when you compare
>> >>>>> records, you will actually compare the *whole* key of the record.
>> For
>> >>>>> normalized key, it's fixed-length in this case, IIRC it will
>> >>>>> take 8 bytes to represent the string. And the sorter will store the
>> >>>>> normalized key and offset in a dedicated array. When doing
>> >>>>> the sorting, it only sorts this *small* array. If the normalized
>> keys
>> >>> are
>> >>>>> different, you could immediately tell which is greater from
>> >>>>> normalized keys. You only have to compare the full keys if the
>> >>> normalized
>> >>>>> keys are equal and you know in this case the normalized
>> >>>>> key couldn't represent the full key. The reason why Dataset is doing
>> >>> this
>> >>>>> is it's super cache efficient by sorting the *small* array.
>> >>>>> The idea is borrowed from this paper [1]. Let me know if I missed or
>> >>>>> misunderstood anything.
>> >>>>>
>> >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
>> >>>>> cache-sensitive parallel external sort)
>> >>>>>
>> >>>>> Best,
>> >>>>> Kurt
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
>> >> dwysakow...@apache.org
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hey Kurt,
>> >>>>>>
>> >>>>>> Thank you for comments!
>> >>>>>>
>> >>>>>> Ad. 1 I might have missed something here, but as far as I see it is
>> >>> that
>> >>>>>> using the current execution stack with regular state backends
>> >> (RocksDB
>> >>>>>> in particular if we want to have spilling capabilities) is
>> equivalent
>> >>> to
>> >>>>>> hash-based execution. I can see a different spilling state backend
>> >>>>>> implementation in the future, but I think it is not batch specifc.
>> Or
>> >>> am
>> >>>>>> I missing something?
>> >>>>>>
>> >>>>>> Ad. 2 Totally agree that normalized keys are important to the
>> >>>>>> performance. I think though TypeComparators are not a necessity to
>> >> have
>> >>>>>> that. Actually  this proposal is heading towards only ever
>> performing
>> >>>>>> "normalized keys" comparison. I have not included in the proposal
>> the
>> >>>>>> binary format which we will use for sorting (partially because I
>> >>> forgot,
>> >>>>>> and partially because I thought it was too much of an
>> implementation
>> >>>>>> detail). Let me include it here though, as it might clear the
>> >> situation
>> >>>>>> a bit here.
>> >>>>>>
>> >>>>>> In DataSet, at times we have KeySelectors which extract keys based
>> on
>> >>>>>> field indices or names. This allows in certain situation to extract
>> >> the
>> >>>>>> key from serialized records. Compared to DataSet, in DataStream,
>> the
>> >>> key
>> >>>>>> is always described with a black-box KeySelector, or differently
>> >> with a
>> >>>>>> function which extracts a key from a deserialized record.  In turn
>> >>> there
>> >>>>>> is no way to create a comparator that could compare records by
>> >>>>>> extracting the key from a serialized record (neither with, nor
>> >> without
>> >>>>>> key normalization). We suggest that the input for the sorter will
>> be
>> >>>>>>
>> >>>>>> <key> + <timestamp> + <record>
>> >>>>>>
>> >>>>>> Without having the key prepended we would have to deserialize the
>> >>> record
>> >>>>>> for every key comparison.
>> >>>>>>
>> >>>>>> Therefore if we agree that we perform binary comparison for keys
>> >> (which
>> >>>>>> are always prepended), it is actually equivalent to a DataSet with
>> >>>>>> TypeComparators that support key normalization.
>> >>>>>>
>> >>>>>> Let me know if that is clear, or I have missed something here.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>>
>> >>>>>> Dawid
>> >>>>>>
>> >>>>>> On 08/09/2020 03:39, Kurt Young wrote:
>> >>>>>>> Hi Dawid, thanks for bringing this up, it's really exciting to see
>> >>> that
>> >>>>>>> batch execution is introduced in DataStream. From the flip, it
>> seems
>> >>>>>>> we are sticking with sort based execution mode (at least for now),
>> >>> which
>> >>>>>>> will sort the whole input data before any *keyed* operation is
>> >>>>>>> executed. I have two comments here:
>> >>>>>>>
>> >>>>>>> 1. Do we want to introduce hash-based execution in the future?
>> Sort
>> >>> is a
>> >>>>>>> safe choice but not the best in lots of cases. IIUC we only need
>> >>>>>>> to make sure that before the framework finishes dealing with one
>> >> key,
>> >>> the
>> >>>>>>> operator doesn't see any data belonging to other keys, thus
>> >>>>>>> hash-based execution would also do the trick. Oon tricky thing the
>> >>>>>>> framework might need to deal with is memory constraint and
>> spilling
>> >>>>>>> in the hash map, but Flink also has some good knowledge about
>> these
>> >>>>>> stuff.
>> >>>>>>> 2. Going back to sort-based execution and how to sort keys. From
>> my
>> >>>>>>> experience, the performance of sorting would be one the most
>> >> important
>> >>>>>>> things if we want to achieve good performance of batch execution.
>> >> And
>> >>>>>>> normalized keys are actually the key of the performance of
>> sorting.
>> >>>>>>> If we want to get rid of TypeComparator, I think we still need to
>> >>> find a
>> >>>>>>> way to introduce this back.
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Kurt
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
>> >> aljos...@apache.org>
>> >>>>>> wrote:
>> >>>>>>>> Yes, I think we can address the problem of indeterminacy in a
>> >>> separate
>> >>>>>>>> FLIP because we're already in it.
>> >>>>>>>>
>> >>>>>>>> Aljoscha
>> >>>>>>>>
>> >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>> >>>>>>>>> @Seth That's a very good point. I agree that RocksDB has the
>> same
>> >>>>>>>>> problem. I think we can use the same approach for the sorted
>> >>> shuffles
>> >>>>>>>>> then. @Aljoscha I agree we should think about making it more
>> >>> resilient,
>> >>>>>>>>> as I guess users might have problems already if they use keys
>> with
>> >>>>>>>>> non-deterministic binary representation. How do you feel about
>> >>>>>>>>> addressing that separately purely to limit the scope of this
>> FLIP?
>> >>>>>>>>>
>> >>>>>>>>> @Aljoscha I tend to agree with you that the best place to
>> actually
>> >>>>>> place
>> >>>>>>>>> the sorting would be in the InputProcessor(s). If there are no
>> >> more
>> >>>>>>>>> suggestions in respect to that issue. I'll put this proposal for
>> >>>>>> voting.
>> >>>>>>>>> @all Thank you for the feedback so far. I'd like to start a
>> voting
>> >>>>>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
>> >>>>>> comment
>> >>>>>>>>> before that, if you still have some outstanding ideas.
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>>
>> >>>>>>>>> Dawid
>> >>>>>>>>>
>> >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>> >>>>>>>>>> Seth is right, I was just about to write that as well. There
>> is a
>> >>>>>>>>>> problem, though, because some of our TypeSerializers are not
>> >>>>>>>>>> deterministic even though we use them as if they were. Beam
>> >>> excludes
>> >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in certain
>> cases.
>> >>> I'm
>> >>>>>>>>>> pretty sure there is also weirdness going on in our
>> >> KryoSerializer.
>> >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>> >>>>>>>>>>> There is already an implicit assumption the TypeSerializer for
>> >>> keys
>> >>>>>> is
>> >>>>>>>>>>> stable/deterministic, RocksDB compares keys using their
>> >> serialized
>> >>>>>> byte
>> >>>>>>>>>>> strings. I think this is a non-issue (or at least it's not
>> >>> changing
>> >>>>>> the
>> >>>>>>>>>>> status quo).
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <
>> twal...@apache.org
>> >>>>>>>> wrote:
>> >>>>>>>>>>>> +1 for getting rid of the TypeComparator interface and rely
>> on
>> >>> the
>> >>>>>>>>>>>> serialized representation for grouping.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Adding a new type to DataStream API is quite difficult at the
>> >>> moment
>> >>>>>>>>>>>> due
>> >>>>>>>>>>>> to too many components that are required: TypeInformation
>> >> (tries
>> >>> to
>> >>>>>>>>>>>> deal
>> >>>>>>>>>>>> with logical fields for TypeComparators), TypeSerializer
>> (incl.
>> >>> it's
>> >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with many methods
>> and
>> >>>>>>>>>>>> internals such normalized keys etc.).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> If necessary, we can add more simple comparison-related
>> methods
>> >>> to
>> >>>>>> the
>> >>>>>>>>>>>> TypeSerializer interface itself in the future (like
>> >>>>>>>>>>>> TypeSerializer.isDeterministic).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Regards,
>> >>>>>>>>>>>> Timo
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>> >>>>>>>>>>>>> Thanks for publishing the FLIP!
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
>> >>> dwysakow...@apache.org>
>> >>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>      1. How to sort/group keys? What representation of the
>> >> key
>> >>>>>>>>>>>>>> should we
>> >>>>>>>>>>>>>>         use? Should we sort on the binary form or should we
>> >>> depend
>> >>>>>> on
>> >>>>>>>>>>>>>>         Comparators being available.
>> >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>> >>>>>>>>>>>>> sorting/grouping
>> >>>>>>>>>>>> by using the binary representation. Then my opinion switched
>> >> and
>> >>> I
>> >>>>>>>>>>>> thought
>> >>>>>>>>>>>> we should use TypeComparator/Comparator because that's what
>> the
>> >>>>>>>>>>>> DataSet API
>> >>>>>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my
>> >>> opinion
>> >>>>>>>>>>>> to use
>> >>>>>>>>>>>> the binary representation because it means we can eventually
>> >> get
>> >>> rid
>> >>>>>>>>>>>> of the
>> >>>>>>>>>>>> TypeComparator interface, which is a bit complicated, and
>> >>> because we
>> >>>>>>>>>>>> don't
>> >>>>>>>>>>>> need any good order in our sort, we only need the grouping.
>> >>>>>>>>>>>>> This comes with some problems, though: we need to ensure
>> that
>> >>> the
>> >>>>>>>>>>>> TypeSerializer of the type we're sorting is
>> >> stable/deterministic.
>> >>>>>>>>>>>> Beam has
>> >>>>>>>>>>>> infrastructure for this in the form of
>> >>> Coder.verifyDeterministic()
>> >>>>>> [1]
>> >>>>>>>>>>>> which we don't have right now and should add if we go down
>> this
>> >>>>>> path.
>> >>>>>>>>>>>>>>      2. Where in the stack should we apply the sorting
>> (this
>> >>>>>> rather a
>> >>>>>>>>>>>>>>         discussion about internals)
>> >>>>>>>>>>>>> Here, I'm gravitating towards the third option of
>> implementing
>> >>> it
>> >>>>>>>>>>>>> in the
>> >>>>>>>>>>>> layer of the StreamTask, which probably means implementing a
>> >>> custom
>> >>>>>>>>>>>> InputProcessor. I think it's best to do it in this layer
>> >> because
>> >>> we
>> >>>>>>>>>>>> would
>> >>>>>>>>>>>> not mix concerns of different layers as we would if we
>> >>> implemented
>> >>>>>>>>>>>> this as
>> >>>>>>>>>>>> a custom StreamOperator. I think this solution is also best
>> >> when
>> >>> it
>> >>>>>>>>>>>> comes
>> >>>>>>>>>>>> to multi-input operators.
>> >>>>>>>>>>>>>>      3. How should we deal with custom implementations of
>> >>>>>>>>>>>>>> StreamOperators
>> >>>>>>>>>>>>> I think the cleanest solution would be to go through the
>> >>> complete
>> >>>>>>>>>>>> operator lifecycle for every key, because then the watermark
>> >>> would
>> >>>>>> not
>> >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not break the
>> >>>>>> semantical
>> >>>>>>>>>>>> guarantees that we gave to operators so far, in that the
>> >>> watermark
>> >>>>>> is
>> >>>>>>>>>>>> strictly monotonically increasing. However, I don't think
>> this
>> >>>>>>>>>>>> solution is
>> >>>>>>>>>>>> feasible because it would come with too much overhead. We
>> >> should
>> >>>>>>>>>>>> solve this
>> >>>>>>>>>>>> problem via documentation and maybe educate people to not
>> query
>> >>> the
>> >>>>>>>>>>>> current
>> >>>>>>>>>>>> watermark or not rely on the watermark being monotonically
>> >>>>>>>>>>>> increasing in
>> >>>>>>>>>>>> operator implementations to allow the framework more freedoms
>> >> in
>> >>> how
>> >>>>>>>>>>>> user
>> >>>>>>>>>>>> programs are executed.
>> >>>>>>>>>>>>> Aljoscha
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> [1]
>> >>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>> >>>
>>
>>

Reply via email to