Hi all,

Sorry for being late to the discussion, but I just noticed there are some
state backend related changes proposed in this FLIP, so would like to share
my two cents.

First of all, for the performance testing result, I'm wondering whether the
sorting cost is counted in the result for both DataSet and refined
DataStream implementations. I could think of the saving of hash computation
and final iteration to emit the word-count result (processing a key at a
time could save such iteration), but not sure whether these cost savings
are at the same grade of comparing the key bytes.

Regardless of the performance result, I agree that the capability of
removing the data after processing a key could prominently reduce the space
required by state, so introducing a new state backend for bounded stream
makes sense.

However, I'm not fully convinced to introduce a new
`InternalKeyedStateBackend` interface. I agree that we don't need to take
the overhead of `AbstractKeyedStateBackend` since we don't plan to support
checkpoint for now, but why don't we directly write a state backend
implementation for bounded stream? Or are we planning to introduce more
internal state backends in future? What's more, the current design of
`InternalKeyedStateBackend` in the FLIP document seems to be extending as
many interfaces as `AbstractedKeyedStateBackend` implements, which I guess
is a typo.

Thirdly, I suggest we name the special state backend as
`BoundedStreamInternalStateBackend`. And from our existing javadoc of
`StateBackend` it actually cannot be called a complete state backend...: "A
State Backend defines how the state of a streaming application is stored
and checkpointed".

Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" in
the FLIP, and suggest we write the key design down, such as how to detect
the key switching and remove the data (especially in the non-windowing
case), etc.

Thanks.

Best Regards,
Yu


On Wed, 9 Sep 2020 at 17:18, Kurt Young <ykt...@gmail.com> wrote:

> Yes, I didn't intend to block this FLIP, and some of the comments are
> actually implementation details.
> And all of them are handled internally, not visible to users, thus we can
> also change or improve them
> in the future.
>
> Best,
> Kurt
>
>
> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > I think Kurts concerns/comments are very valid and we need to implement
> > such things in the future. However, I also think that we need to get
> > started somewhere and I think what's proposed in this FLIP is a good
> > starting point that we can build on. So we should not get paralyzed by
> > thinking too far ahead into the future. Does that make sense?
> >
> > Best,
> > Aljoscha
> >
> > On 08.09.20 16:59, Dawid Wysakowicz wrote:
> > > Ad. 1
> > >
> > > Yes, you are right in principle.
> > >
> > > Let me though clarify my proposal a bit. The proposed sort-style
> > > execution aims at a generic KeyedProcessFunction were all the
> > > "aggregations" are actually performed in the user code. It tries to
> > > improve the performance by actually removing the need to use RocksDB
> > e.g.:
> > >
> > >      private static final class Summer<K>
> > >              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
> > > Tuple2<K, Integer>> {
> > >
> > >          ....
> > >
> > >          @Override
> > >          public void processElement(
> > >                  Tuple2<K, Integer> value,
> > >                  Context ctx,
> > >                  Collector<Tuple2<K, Integer>> out) throws Exception {
> > >              if (!Objects.equals(timerRegistered.value(),
> Boolean.TRUE))
> > {
> > >
> > ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> > >                  timerRegistered.update(true);
> > >              }
> > >              Integer v = counter.value();
> > >              Integer incomingValue = value.f1;
> > >              if (v != null) {
> > >                  v += incomingValue;
> > >              } else {
> > >                  v = incomingValue;
> > >              }
> > >              counter.update(v);
> > >          }
> > >
> > >          ....
> > >
> > >     }
> > >
> > > Therefore I don't think the first part of your reply with separating
> the
> > > write and read workload applies here. We do not aim to create a
> > > competing API with the Table API. We think operations such as joins or
> > > analytical aggregations should be performed in Table API.
> > >
> > > As for the second part I agree it would be nice to fall back to the
> > > sorting approach only if a certain threshold of memory in a State
> > > Backend is used. This has some problems though. We would need a way to
> > > estimate the size of the occupied memory to tell when the threshold is
> > > reached. That is not easily doable by default e.g. in a
> > > MemoryStateBackend, as we do not serialize the values in the state
> > > backend by default. We would have to add that, but this would add the
> > > overhead of the serialization.
> > >
> > > This proposal aims at the cases where we do have a large state that
> will
> > > not fit into the memory and without the change users are forced to use
> > > RocksDB. If the state fits in memory I agree it will be better to do
> > > hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> > > think it is important to give users the choice to use one or the other
> > > approach. We might discuss which approach should be the default for
> > > RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> > > user configured state backend or sorting-based with a single key at a
> > > time backend. Moreover we could think if we should let users choose the
> > > sort vs hash "state backend" per operator. Would that suffice?
> > >
> > > Ad. 2
> > >
> > > I still think we can just use the first X bytes of the serialized form
> > > as the normalized key and fallback to comparing full keys on clashes.
> It
> > > is because we are actually not interested in a logical order, but we
> > > care only about the "grouping" aspect of the sorting. Therefore I think
> > > its enough to compare only parts of the full key as the normalized key.
> > >
> > > Thanks again for the really nice and thorough feedback!
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 08/09/2020 14:47, Kurt Young wrote:
> > >> Regarding #1, yes the state backend is definitely hash-based
> execution.
> > >> However there are some differences between
> > >> batch hash-based execution. The key difference is *random access &
> > >> read/write mixed workload". For example, by using
> > >> state backend in streaming execution, one have to mix the read and
> write
> > >> operations and all of them are actually random
> > >> access. But in a batch hash execution, we could divide the phases into
> > >> write and read. For example, we can build the
> > >> hash table first, with only write operations. And once the build is
> > done,
> > >> we can start to read and trigger the user codes.
> > >> Take hash aggregation which blink planner implemented as an example,
> > during
> > >> building phase, as long as the hash map
> > >> could fit into memory, we will update the accumulators directly in the
> > hash
> > >> map. And once we are running out of memory,
> > >> we then fall back to sort based execution. It improves the
> performance a
> > >> lot if the incoming data can be processed in
> > >> memory.
> > >>
> > >> Regarding #2, IIUC you are actually describing a binary format of key,
> > not
> > >> normalized key which is used in DataSet. I will
> > >> take String for example. If we have lots of keys with length all
> greater
> > >> than, let's say 20. In your proposal, you will encode
> > >> the whole string in the prefix of your composed data ( <key> +
> > <timestamp>
> > >> + <record> ). And when you compare
> > >> records, you will actually compare the *whole* key of the record. For
> > >> normalized key, it's fixed-length in this case, IIRC it will
> > >> take 8 bytes to represent the string. And the sorter will store the
> > >> normalized key and offset in a dedicated array. When doing
> > >> the sorting, it only sorts this *small* array. If the normalized keys
> > are
> > >> different, you could immediately tell which is greater from
> > >> normalized keys. You only have to compare the full keys if the
> > normalized
> > >> keys are equal and you know in this case the normalized
> > >> key couldn't represent the full key. The reason why Dataset is doing
> > this
> > >> is it's super cache efficient by sorting the *small* array.
> > >> The idea is borrowed from this paper [1]. Let me know if I missed or
> > >> misunderstood anything.
> > >>
> > >> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
> > >> cache-sensitive parallel external sort)
> > >>
> > >> Best,
> > >> Kurt
> > >>
> > >>
> > >> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
> dwysakow...@apache.org
> > >
> > >> wrote:
> > >>
> > >>> Hey Kurt,
> > >>>
> > >>> Thank you for comments!
> > >>>
> > >>> Ad. 1 I might have missed something here, but as far as I see it is
> > that
> > >>> using the current execution stack with regular state backends
> (RocksDB
> > >>> in particular if we want to have spilling capabilities) is equivalent
> > to
> > >>> hash-based execution. I can see a different spilling state backend
> > >>> implementation in the future, but I think it is not batch specifc. Or
> > am
> > >>> I missing something?
> > >>>
> > >>> Ad. 2 Totally agree that normalized keys are important to the
> > >>> performance. I think though TypeComparators are not a necessity to
> have
> > >>> that. Actually  this proposal is heading towards only ever performing
> > >>> "normalized keys" comparison. I have not included in the proposal the
> > >>> binary format which we will use for sorting (partially because I
> > forgot,
> > >>> and partially because I thought it was too much of an implementation
> > >>> detail). Let me include it here though, as it might clear the
> situation
> > >>> a bit here.
> > >>>
> > >>> In DataSet, at times we have KeySelectors which extract keys based on
> > >>> field indices or names. This allows in certain situation to extract
> the
> > >>> key from serialized records. Compared to DataSet, in DataStream, the
> > key
> > >>> is always described with a black-box KeySelector, or differently
> with a
> > >>> function which extracts a key from a deserialized record.  In turn
> > there
> > >>> is no way to create a comparator that could compare records by
> > >>> extracting the key from a serialized record (neither with, nor
> without
> > >>> key normalization). We suggest that the input for the sorter will be
> > >>>
> > >>> <key> + <timestamp> + <record>
> > >>>
> > >>> Without having the key prepended we would have to deserialize the
> > record
> > >>> for every key comparison.
> > >>>
> > >>> Therefore if we agree that we perform binary comparison for keys
> (which
> > >>> are always prepended), it is actually equivalent to a DataSet with
> > >>> TypeComparators that support key normalization.
> > >>>
> > >>> Let me know if that is clear, or I have missed something here.
> > >>>
> > >>> Best,
> > >>>
> > >>> Dawid
> > >>>
> > >>> On 08/09/2020 03:39, Kurt Young wrote:
> > >>>> Hi Dawid, thanks for bringing this up, it's really exciting to see
> > that
> > >>>> batch execution is introduced in DataStream. From the flip, it seems
> > >>>> we are sticking with sort based execution mode (at least for now),
> > which
> > >>>> will sort the whole input data before any *keyed* operation is
> > >>>> executed. I have two comments here:
> > >>>>
> > >>>> 1. Do we want to introduce hash-based execution in the future? Sort
> > is a
> > >>>> safe choice but not the best in lots of cases. IIUC we only need
> > >>>> to make sure that before the framework finishes dealing with one
> key,
> > the
> > >>>> operator doesn't see any data belonging to other keys, thus
> > >>>> hash-based execution would also do the trick. Oon tricky thing the
> > >>>> framework might need to deal with is memory constraint and spilling
> > >>>> in the hash map, but Flink also has some good knowledge about these
> > >>> stuff.
> > >>>> 2. Going back to sort-based execution and how to sort keys. From my
> > >>>> experience, the performance of sorting would be one the most
> important
> > >>>> things if we want to achieve good performance of batch execution.
> And
> > >>>> normalized keys are actually the key of the performance of sorting.
> > >>>> If we want to get rid of TypeComparator, I think we still need to
> > find a
> > >>>> way to introduce this back.
> > >>>>
> > >>>> Best,
> > >>>> Kurt
> > >>>>
> > >>>>
> > >>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
> aljos...@apache.org>
> > >>> wrote:
> > >>>>> Yes, I think we can address the problem of indeterminacy in a
> > separate
> > >>>>> FLIP because we're already in it.
> > >>>>>
> > >>>>> Aljoscha
> > >>>>>
> > >>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> > >>>>>> @Seth That's a very good point. I agree that RocksDB has the same
> > >>>>>> problem. I think we can use the same approach for the sorted
> > shuffles
> > >>>>>> then. @Aljoscha I agree we should think about making it more
> > resilient,
> > >>>>>> as I guess users might have problems already if they use keys with
> > >>>>>> non-deterministic binary representation. How do you feel about
> > >>>>>> addressing that separately purely to limit the scope of this FLIP?
> > >>>>>>
> > >>>>>> @Aljoscha I tend to agree with you that the best place to actually
> > >>> place
> > >>>>>> the sorting would be in the InputProcessor(s). If there are no
> more
> > >>>>>> suggestions in respect to that issue. I'll put this proposal for
> > >>> voting.
> > >>>>>> @all Thank you for the feedback so far. I'd like to start a voting
> > >>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
> > >>> comment
> > >>>>>> before that, if you still have some outstanding ideas.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>>
> > >>>>>> Dawid
> > >>>>>>
> > >>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
> > >>>>>>> Seth is right, I was just about to write that as well. There is a
> > >>>>>>> problem, though, because some of our TypeSerializers are not
> > >>>>>>> deterministic even though we use them as if they were. Beam
> > excludes
> > >>>>>>> the FloatCoder, for example, and the AvroCoder in certain cases.
> > I'm
> > >>>>>>> pretty sure there is also weirdness going on in our
> KryoSerializer.
> > >>>>>>>
> > >>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
> > >>>>>>>> There is already an implicit assumption the TypeSerializer for
> > keys
> > >>> is
> > >>>>>>>> stable/deterministic, RocksDB compares keys using their
> serialized
> > >>> byte
> > >>>>>>>> strings. I think this is a non-issue (or at least it's not
> > changing
> > >>> the
> > >>>>>>>> status quo).
> > >>>>>>>>
> > >>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twal...@apache.org
> >
> > >>>>> wrote:
> > >>>>>>>>> +1 for getting rid of the TypeComparator interface and rely on
> > the
> > >>>>>>>>> serialized representation for grouping.
> > >>>>>>>>>
> > >>>>>>>>> Adding a new type to DataStream API is quite difficult at the
> > moment
> > >>>>>>>>> due
> > >>>>>>>>> to too many components that are required: TypeInformation
> (tries
> > to
> > >>>>>>>>> deal
> > >>>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl.
> > it's
> > >>>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
> > >>>>>>>>> internals such normalized keys etc.).
> > >>>>>>>>>
> > >>>>>>>>> If necessary, we can add more simple comparison-related methods
> > to
> > >>> the
> > >>>>>>>>> TypeSerializer interface itself in the future (like
> > >>>>>>>>> TypeSerializer.isDeterministic).
> > >>>>>>>>>
> > >>>>>>>>> Regards,
> > >>>>>>>>> Timo
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> > >>>>>>>>>> Thanks for publishing the FLIP!
> > >>>>>>>>>>
> > >>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
> > dwysakow...@apache.org>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>      1. How to sort/group keys? What representation of the
> key
> > >>>>>>>>>>> should we
> > >>>>>>>>>>>         use? Should we sort on the binary form or should we
> > depend
> > >>> on
> > >>>>>>>>>>>         Comparators being available.
> > >>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
> > >>>>>>>>>> sorting/grouping
> > >>>>>>>>> by using the binary representation. Then my opinion switched
> and
> > I
> > >>>>>>>>> thought
> > >>>>>>>>> we should use TypeComparator/Comparator because that's what the
> > >>>>>>>>> DataSet API
> > >>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my
> > opinion
> > >>>>>>>>> to use
> > >>>>>>>>> the binary representation because it means we can eventually
> get
> > rid
> > >>>>>>>>> of the
> > >>>>>>>>> TypeComparator interface, which is a bit complicated, and
> > because we
> > >>>>>>>>> don't
> > >>>>>>>>> need any good order in our sort, we only need the grouping.
> > >>>>>>>>>> This comes with some problems, though: we need to ensure that
> > the
> > >>>>>>>>> TypeSerializer of the type we're sorting is
> stable/deterministic.
> > >>>>>>>>> Beam has
> > >>>>>>>>> infrastructure for this in the form of
> > Coder.verifyDeterministic()
> > >>> [1]
> > >>>>>>>>> which we don't have right now and should add if we go down this
> > >>> path.
> > >>>>>>>>>>>      2. Where in the stack should we apply the sorting (this
> > >>> rather a
> > >>>>>>>>>>>         discussion about internals)
> > >>>>>>>>>> Here, I'm gravitating towards the third option of implementing
> > it
> > >>>>>>>>>> in the
> > >>>>>>>>> layer of the StreamTask, which probably means implementing a
> > custom
> > >>>>>>>>> InputProcessor. I think it's best to do it in this layer
> because
> > we
> > >>>>>>>>> would
> > >>>>>>>>> not mix concerns of different layers as we would if we
> > implemented
> > >>>>>>>>> this as
> > >>>>>>>>> a custom StreamOperator. I think this solution is also best
> when
> > it
> > >>>>>>>>> comes
> > >>>>>>>>> to multi-input operators.
> > >>>>>>>>>>>      3. How should we deal with custom implementations of
> > >>>>>>>>>>> StreamOperators
> > >>>>>>>>>> I think the cleanest solution would be to go through the
> > complete
> > >>>>>>>>> operator lifecycle for every key, because then the watermark
> > would
> > >>> not
> > >>>>>>>>> oscillate between -Inf and +Inf and we would not break the
> > >>> semantical
> > >>>>>>>>> guarantees that we gave to operators so far, in that the
> > watermark
> > >>> is
> > >>>>>>>>> strictly monotonically increasing. However, I don't think this
> > >>>>>>>>> solution is
> > >>>>>>>>> feasible because it would come with too much overhead. We
> should
> > >>>>>>>>> solve this
> > >>>>>>>>> problem via documentation and maybe educate people to not query
> > the
> > >>>>>>>>> current
> > >>>>>>>>> watermark or not rely on the watermark being monotonically
> > >>>>>>>>> increasing in
> > >>>>>>>>> operator implementations to allow the framework more freedoms
> in
> > how
> > >>>>>>>>> user
> > >>>>>>>>> programs are executed.
> > >>>>>>>>>> Aljoscha
> > >>>>>>>>>>
> > >>>>>>>>>> [1]
> > >>>
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> > >>>
> > >
> >
> >
>

Reply via email to