Hey Kurt,

Thank you for comments!

Ad. 1 I might have missed something here, but as far as I see it is that
using the current execution stack with regular state backends (RocksDB
in particular if we want to have spilling capabilities) is equivalent to
hash-based execution. I can see a different spilling state backend
implementation in the future, but I think it is not batch specifc. Or am
I missing something?

Ad. 2 Totally agree that normalized keys are important to the
performance. I think though TypeComparators are not a necessity to have
that. Actually  this proposal is heading towards only ever performing
"normalized keys" comparison. I have not included in the proposal the
binary format which we will use for sorting (partially because I forgot,
and partially because I thought it was too much of an implementation
detail). Let me include it here though, as it might clear the situation
a bit here.

In DataSet, at times we have KeySelectors which extract keys based on
field indices or names. This allows in certain situation to extract the
key from serialized records. Compared to DataSet, in DataStream, the key
is always described with a black-box KeySelector, or differently with a
function which extracts a key from a deserialized record.  In turn there
is no way to create a comparator that could compare records by
extracting the key from a serialized record (neither with, nor without
key normalization). We suggest that the input for the sorter will be

<key> + <timestamp> + <record>

Without having the key prepended we would have to deserialize the record
for every key comparison.

Therefore if we agree that we perform binary comparison for keys (which
are always prepended), it is actually equivalent to a DataSet with
TypeComparators that support key normalization.

Let me know if that is clear, or I have missed something here.

Best,

Dawid

On 08/09/2020 03:39, Kurt Young wrote:
> Hi Dawid, thanks for bringing this up, it's really exciting to see that
> batch execution is introduced in DataStream. From the flip, it seems
> we are sticking with sort based execution mode (at least for now), which
> will sort the whole input data before any *keyed* operation is
> executed. I have two comments here:
>
> 1. Do we want to introduce hash-based execution in the future? Sort is a
> safe choice but not the best in lots of cases. IIUC we only need
> to make sure that before the framework finishes dealing with one key, the
> operator doesn't see any data belonging to other keys, thus
> hash-based execution would also do the trick. Oon tricky thing the
> framework might need to deal with is memory constraint and spilling
> in the hash map, but Flink also has some good knowledge about these stuff.
>
> 2. Going back to sort-based execution and how to sort keys. From my
> experience, the performance of sorting would be one the most important
> things if we want to achieve good performance of batch execution. And
> normalized keys are actually the key of the performance of sorting.
> If we want to get rid of TypeComparator, I think we still need to find a
> way to introduce this back.
>
> Best,
> Kurt
>
>
> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Yes, I think we can address the problem of indeterminacy in a separate
>> FLIP because we're already in it.
>>
>> Aljoscha
>>
>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>> @Seth That's a very good point. I agree that RocksDB has the same
>>> problem. I think we can use the same approach for the sorted shuffles
>>> then. @Aljoscha I agree we should think about making it more resilient,
>>> as I guess users might have problems already if they use keys with
>>> non-deterministic binary representation. How do you feel about
>>> addressing that separately purely to limit the scope of this FLIP?
>>>
>>> @Aljoscha I tend to agree with you that the best place to actually place
>>> the sorting would be in the InputProcessor(s). If there are no more
>>> suggestions in respect to that issue. I'll put this proposal for voting.
>>>
>>> @all Thank you for the feedback so far. I'd like to start a voting
>>> thread on the proposal tomorrow. Therefore I'd appreciate if you comment
>>> before that, if you still have some outstanding ideas.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>>>> Seth is right, I was just about to write that as well. There is a
>>>> problem, though, because some of our TypeSerializers are not
>>>> deterministic even though we use them as if they were. Beam excludes
>>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
>>>> pretty sure there is also weirdness going on in our KryoSerializer.
>>>>
>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>>>>> There is already an implicit assumption the TypeSerializer for keys is
>>>>> stable/deterministic, RocksDB compares keys using their serialized byte
>>>>> strings. I think this is a non-issue (or at least it's not changing the
>>>>> status quo).
>>>>>
>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twal...@apache.org>
>> wrote:
>>>>>> +1 for getting rid of the TypeComparator interface and rely on the
>>>>>> serialized representation for grouping.
>>>>>>
>>>>>> Adding a new type to DataStream API is quite difficult at the moment
>>>>>> due
>>>>>> to too many components that are required: TypeInformation (tries to
>>>>>> deal
>>>>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
>>>>>> snapshot interfaces), and TypeComparator (with many methods and
>>>>>> internals such normalized keys etc.).
>>>>>>
>>>>>> If necessary, we can add more simple comparison-related methods to the
>>>>>> TypeSerializer interface itself in the future (like
>>>>>> TypeSerializer.isDeterministic).
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>>>>>> Thanks for publishing the FLIP!
>>>>>>>
>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakow...@apache.org>
>>>>>>> wrote:
>>>>>>>>     1. How to sort/group keys? What representation of the key
>>>>>>>> should we
>>>>>>>>        use? Should we sort on the binary form or should we depend on
>>>>>>>>        Comparators being available.
>>>>>>> Initially, I suggested to Dawid (in private) to do the
>>>>>>> sorting/grouping
>>>>>> by using the binary representation. Then my opinion switched and I
>>>>>> thought
>>>>>> we should use TypeComparator/Comparator because that's what the
>>>>>> DataSet API
>>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
>>>>>> to use
>>>>>> the binary representation because it means we can eventually get rid
>>>>>> of the
>>>>>> TypeComparator interface, which is a bit complicated, and because we
>>>>>> don't
>>>>>> need any good order in our sort, we only need the grouping.
>>>>>>> This comes with some problems, though: we need to ensure that the
>>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
>>>>>> Beam has
>>>>>> infrastructure for this in the form of Coder.verifyDeterministic() [1]
>>>>>> which we don't have right now and should add if we go down this path.
>>>>>>>>     2. Where in the stack should we apply the sorting (this rather a
>>>>>>>>        discussion about internals)
>>>>>>> Here, I'm gravitating towards the third option of implementing it
>>>>>>> in the
>>>>>> layer of the StreamTask, which probably means implementing a custom
>>>>>> InputProcessor. I think it's best to do it in this layer because we
>>>>>> would
>>>>>> not mix concerns of different layers as we would if we implemented
>>>>>> this as
>>>>>> a custom StreamOperator. I think this solution is also best when it
>>>>>> comes
>>>>>> to multi-input operators.
>>>>>>>>     3. How should we deal with custom implementations of
>>>>>>>> StreamOperators
>>>>>>> I think the cleanest solution would be to go through the complete
>>>>>> operator lifecycle for every key, because then the watermark would not
>>>>>> oscillate between -Inf and +Inf and we would not break the semantical
>>>>>> guarantees that we gave to operators so far, in that the watermark is
>>>>>> strictly monotonically increasing. However, I don't think this
>>>>>> solution is
>>>>>> feasible because it would come with too much overhead. We should
>>>>>> solve this
>>>>>> problem via documentation and maybe educate people to not query the
>>>>>> current
>>>>>> watermark or not rely on the watermark being monotonically
>>>>>> increasing in
>>>>>> operator implementations to allow the framework more freedoms in how
>>>>>> user
>>>>>> programs are executed.
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>>>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to