Hi Fabian

Thanks for your update. The opinions on upsert streams are highly
enlightening. I think now I am agree with you that we can choose option 2
to solve the problem: Throw away empty deletes when source generate
retractions, otherwise pass empty deletes down.

As for the UpsertSink, I think we don't need to filter empty deletes in it,
unless the external system should not receive empty deletes. It would be
good to provide an optional parameter in the StreamQueryConfig to indicate
whether filter empty deletes in upsert sinks(i.e., this is a job
configuration). In this way, we can also solve the Filter
problems(FLINK-9528). I will create another subtask about UpsertSink later.

Thanks again for all the suggestions. It really helps me a lot.
Best, Hequn.


On Tue, Aug 28, 2018 at 9:47 PM Fabian Hueske <fhue...@apache.org> wrote:

> Hi Hequn, hi Piotr,
>
> Thanks for pushing this discussion forward and sorry for not responding
> earlier.
>
> After reading the thread, I agree that we do not *need* to (but may)
> forward empty deletes.
> As I pointed out before, I see empty deletes not as a semantic problem
> that needs to be exactly solved but rather as a performance problem that
> can be optimized (trade-off state costs vs. handling empty delete).
>
> Piotr raised a good point. An upsert delete message may consist only of
> the key fields and the delete flag.
> Having no data but the keys means we *cannot* handle them as regular
> records during a join, filter, projection, or aggregation.
>
> Upsert streams are only present, if the stream is received by an operator
> that is able to correctly interpret the upsert messages.
> Right now, there is only the UpsertSink operator that can handle upsert
> streams. Join and Aggregation might to support upsert inputs in the future
> as well.
> There are the following cases:
>
> 1. If there is no operator, that can handle an upsert stream, upserts need
> to be converted into retractions.
> Filtering out empty deletes while converting to retractions comes for free.
> 2. If the receiving operator is a Join or Aggregation, it has all
> necessary state to check whether the delete is empty or not.
> In case of an empty delete, it is simply dropped.
>
> In both cases (retract stream conversion and stateful upsert operator) we
> can filter empty deletes for free.
>
> The only case left are UpsertSinks. These do not have built-in state,
> since it is maintained in an external system.
> As I said before, empty deletes are not a semantic problem. We could
> forward all empty deletes and the result would still be consistent.
> However, I understand that empty deletes can cause severe a performance
> issues.
> We can address the performance issue with different measures such as
> best-effort (approximate) filtering or exact state-backed filtering.
>
> I think in many cases we can handle empty deletes from upsert sources
> without adding additional state.
> As soon as the upsert messages are converted into retraction messages or
> consumed by a join or aggregation, they can be filtered for free.
> We only need to add state, if we have an upsert sink AND if that sink
> wants to remove all empty deletes.
>
> There is one more thing that needs to be discussed. How upsert messages
> are handled by Calc operators.
> A Calc (projection and/or filter) that receives (and produces) an upsert
> stream (because it is in front of a Join, Aggregation, UpsertSink) should
> handle messages as follows:
> - upsert message/flag=true: upsert messages are handled as regular
> message. If the predicate evaluates to false, all but the key fields are
> set to null and the message is forwarded as a delete message
> - delete message/flag=false: delete messages are converted to the output
> schema (padded with nulls) and forwarded.
>
> What do you think,
> Fabian
>
>
>
>
>
>
> Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng <
> chenghe...@gmail.com>:
>
>> Hi Piotrek,
>>
>> Great to see your replies, and really thanks for all your suggestions.
>> Inline is a good way, i will do it same as you :-)
>>
>> *> I’m assuming that we are talking about event time and that `(delete 1,
>> a, 1)` happened before `(add 1, a, 2)`, right?*
>>
>> We are talking about processing time(FLINK-8577
>> <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the
>> next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578>).
>> And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, a,
>> 2)`. However, (add   2, a, 1) & (add   2, a, 2) may come before or after 
>> (delete
>> 1, a, 1) &(add 1, a, 2).
>>
>> *> Maybe we should say, that’s unspecified behaviour and Flink can either
>> forward empty deletes or filter them out depending on what’s more
>> efficient/easier. In the end this whole topic is very fishy. Look at it
>> from the Flink’s perspective. User is trying to remove the data, that was
>> never there in the first place. If it wasn’t there, why should we
>> retract/produce deletes for future records/join results? Should Flink
>> “imagine” that there was a record that it hasn’t seen? Should we apply this
>> logic to other records that Flink “hasn’t seen”?*
>>
>> I don't think unspecified behavior is a good way. Yes, from the Flink's
>> perspective, I think we should not retract/produce for future records/join
>> results if it wasn't there, and this behavior should be consistent among
>> all operators including upsert source. We should not imagine that there was
>> a record that it hasn't seen, thus throw it away from the source.
>>
>> *> Another potential issue. Previously I assumed that upsert deletes do
>> not have to carry the data that they are removing, just delete flag and
>> primary key. If that’s the case, if there is an aggregation after the join,
>> how could we even handle empty delete? We wouldn’t be able to produce valid
>> retract message for the follow up aggregation. *
>>
>> Yes, It is true that we can't handle empty deletes in flink. As I said,
>> downstream operators even don't know how to deal with it.
>>
>> *> If item was an upsert stream with item_id as a primary key, the answer
>> can only be 4. However I do not see any problem here, since aggregations
>> require updates as retractions anyway, thus we can handle empty deletes in
>> UpsertToRetractionsOperator anyway we wish - filtering them out here or
>> marking them as “empty deletes” for forwarding. Marking as “empty deletes”
>> would have to be handled by aggregation in special way - forward it, but do
>> not update the state.*
>>
>> Agree, the answer should only be 4. As you are in favor of option 3, I
>> think you are in favor of forwarding empty deletes in
>> UpsertToRetractionsOperator since the operator is part of upsert
>> source.(UpsertToRetractionsOperator is one way to implement upsert source,
>> there are other ways.)
>> And yes we can add a message type to distinguish the empty deletes from
>> retractions. What I concern is we have to adapt all operators to the
>> message type. What's more these empty deletes are useless for downstream
>> operators(even for sink), because of the changes of key dimension.
>>
>> *> I see also different problems here. What if again, as I mentioned
>> previously, delete message has no content? What if delete message has
>> incorrect content, that doesn’t match to the latest update? Shouldn’t
>> filter always pass delete message without looking into it’s content?*
>>
>> No, Filter should not always pass delete messages without looking into
>> it's content. For sql like:
>>
>>> insert into xxx
>>> select sum(item_v) from item where item_v < 2 and item_id = 1
>>
>> what's the result of sum? -1? -2? PS, the changelog is:
>>
>>> add 1                   (item_v=1, item_id = 2)   be filtered
>>> retract delete 1    (item_v=1, item_id = 2)   no filtered if always pass
>>> delete
>>> add 2                   (item_v=2, item_id = 2)   be filtered
>>> empty delete 1    (item_v=1, item_id = 1)   no filtered if always pass
>>> delete
>>> add 2                  (item_v=2, item_id = 1)   be filtered
>>
>>
>> *> Again, I fail to see semantics problem here. The data was not there in
>> the first place, so whether we forward delete or not shouldn’t matter from
>> semantics perspective: end result is still the same. If it does matter,
>> user should let Flink ingest the stream from some valid/consistent point,
>> not in the middle of it. For me, the only valid concern regarding empty
>> deletes is performance perspective.*
>>
>> The semantics problem here is: case 1 will pass empty deletes, but case 4
>> won't, just because of adding a filter. Whether we forward delete or not
>> the result is not the same. Users can perceive this, since data in his
>> physical storage is different.
>>
>> *> What you are saying is highly use case specific. If you enforce full
>> empty deletions removal in this case, you will end up with terrible
>> performance in other cases/setups. I think configurable best effort cache
>> is still the way to go for the long run. It could be configured with number
>> of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect
>> cache. However I’m not even sure how high priority it should get.*
>>
>> I think this is a normal use case for big data analysis, with diversity
>> keys but focus on small part of it. To take a step back, we should not let
>> users take this risk. I agree keeping a state in source will influence the
>> performance, but I don't think it is terrible. We can even only store key
>> in state when source doesn't generate retraction. The performance of the
>> source is much better than the aggregations. Furthermore, there are other
>> ways to improve the performance, for example adding a bloom filter.
>>
>> *> If we decide to go this way, why even bother with upsert streams and
>> supporting them inside our pipeline? Why not convert everything to
>> retractions in sources and work always on retractions? It would simplify
>> our code and we wouldn’t need the logic from `DataStreamRetractionRules`.*
>>
>> I think these are two different things. One is how to handle empty
>> deletes, the other is how to handle updates.
>>
>> *> If there was a record there because user started Flink in a middle of
>> a stream, the result is still undefined (the join case that you mentioned),
>> since Flink could skip or ingest extra any number of messages (deletes or
>> not).*
>>
>> I think the result is clear if we clearly define that the upsert source
>> ignore empty deletes. Flink only skip or ingest messages according to the
>> sql/table-api provided by users.
>>
>> Thanks, Hequn
>>
>>
>> On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for very detailed explanation :) Please check my inlined
>>> responses.
>>>
>>> On 22 Aug 2018, at 14:28, Hequn Cheng <chenghe...@gmail.com> wrote:
>>>
>>> Hi all,
>>> Thanks a lot for your replies, @Piotr Nowojski <pi...@data-artisans.com>
>>>  @Fabian Hueske <fhue...@apache.org> @Xingcan Cui <xingc...@gmail.com>
>>>
>>> Option 3 will send empty deletes to downstream operators, but downstream
>>> operators seems don't know how to deal with it.
>>> Assume that upsert source forward deletes by default. Let's see what
>>> problems we have.
>>>
>>> *Tables*
>>> Suppose we have two upsert tables with key. One is category(key:
>>> cate_id), the other is item(key: item_id).
>>> -----category-----
>>> cate_id,  cate_v
>>> (delete  a, 1)
>>>
>>> -----item-----
>>> item_id, cate_id, item_v
>>> (add   2, a, 1)
>>> (add   2, a, 2)
>>> (delete 1, a, 1)
>>> (add 1, a, 2)
>>>
>>>
>>> I’m assuming that we are talking about event time and that `(delete 1,
>>> a, 1)` happened before `(add 1, a, 2)`, right?
>>>
>>> *Cases*
>>> case 2: join
>>>
>>>> insert into xxx
>>>> select * from item join category on item.cate_id = category.cate_id;
>>>
>>> Same to case1, would user want to delete all items of category 'a'? For
>>> record (delete  a, 1) from category, should it be filtered by join or
>>> be joined with all items from item table?
>>> If be filtered, it is semantically conflict with case 1.
>>> If not be filtered, it means we have to storage delete messages in state
>>> of join since data from item table may come later to join. Also, how to
>>> deal with these deletes in state is a problem. I think the logic of join
>>> will be very complicated.
>>>
>>>
>>> Maybe we should say, that’s unspecified behaviour and Flink can either
>>> forward empty deletes or filter them out depending on what’s more
>>> efficient/easier. In the end this whole topic is very fishy. Look at it
>>> from the Flink’s perspective. User is trying to remove the data, that was
>>> never there in the first place. If it wasn’t there, why should we
>>> retract/produce deletes for future records/join results? Should Flink
>>> “imagine” that there was a record that it hasn’t seen? Should we apply this
>>> logic to other records that Flink “hasn’t seen”?
>>>
>>> Another potential issue. Previously I assumed that upsert deletes do not
>>> have to carry the data that they are removing, just delete flag and primary
>>> key. If that’s the case, if there is an aggregation after the join, how
>>> could we even handle empty delete? We wouldn’t be able to produce valid
>>> retract message for the follow up aggregation.
>>>
>>>
>>> case 3: aggregate
>>>
>>>> insert into xxx
>>>> select sum(item_v) from item
>>>
>>> What's the result of sum, 4 or 5?
>>> If the answer is 4, how can the downstream group_by tell which delete is
>>> an empty delete or a retract delete? PS: the changelog from item table to
>>> group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably
>>> can add a message type to solve the problem, but it definitely will bring
>>> large changes and I don't think it is time to do it.
>>> If the answer is 3, the result is definitely wrong.
>>>
>>>
>>> If item was an upsert stream with item_id as a primary key, the answer
>>> can only be 4. However I do not see any problem here, since aggregations
>>> require updates as retractions anyway, thus we can handle empty deletes in
>>> UpsertToRetractionsOperator anyway we wish - filtering them out here or
>>> marking them as “empty deletes” for forwarding. Marking as “empty deletes”
>>> would have to be handled by aggregation in special way - forward it, but do
>>> not update the state.
>>>
>>>
>>> case 4: Filter
>>>
>>>> insert into xxx
>>>> select item_id, item_v from item where item_v < 2 and item_id = 1
>>>
>>>
>>> I see also different problems here. What if again, as I mentioned
>>> previously, delete message has no content? What if delete message has
>>> incorrect content, that doesn’t match to the latest update? Shouldn’t
>>> filter always pass delete message without looking into it’s content?
>>>
>>> For item of item_id=1, if Filter also generates empty deletes, these two
>>> kinds of deletes can not be distinguished by the sink. The problem is:
>>> should we filter empty deletes in sink?
>>> If yes, it is semantically conflicting with case 1.
>>> If no, deletes generated by Filter can not be filtered, this can cause
>>> devastating pressure on the physical database.
>>>
>>>
>>> Again, I fail to see semantics problem here. The data was not there in
>>> the first place, so whether we forward delete or not shouldn’t matter from
>>> semantics perspective: end result is still the same. If it does matter,
>>> user should let Flink ingest the stream from some valid/consistent point,
>>> not in the middle of it. For me, the only valid concern regarding empty
>>> deletes is performance perspective.
>>>
>>> Furthermore, I don't think we can use best effort. Considering the
>>> anti-spamming scenario, users probably just want to get top 1% data from
>>> the result, the rest 99% of the data will all become delete messages after
>>> the Filter. This would be a disaster for a storage. Especially for the
>>> case, most of the coming keys are new ones and can not be swallowed by
>>> a cache with best effort. We can not release a version of flink that take a
>>> chance to bring a disaster to our user, even the change is small.
>>>
>>>
>>> What you are saying is highly use case specific. If you enforce full
>>> empty deletions removal in this case, you will end up with terrible
>>> performance in other cases/setups. I think configurable best effort cache
>>> is still the way to go for the long run. It could be configured with number
>>> of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect
>>> cache. However I’m not even sure how high priority it should get.
>>>
>>> Thus, I don't think we should allow empty deletes output from source.
>>>
>>>
>>> If we decide to go this way, why even bother with upsert streams and
>>> supporting them inside our pipeline? Why not convert everything to
>>> retractions in sources and work always on retractions? It would simplify
>>> our code and we wouldn’t need the logic from `DataStreamRetractionRules`.
>>>
>>> To sum up, I really fail to see any semantic problems here. Regardless
>>> if we forward/filter out empty deletes the end result is always the same:
>>> there was no record there in the first place. In set theory following
>>> operations give the same outcome:
>>>
>>> {1, 2, 3} \ {4}
>>> {1, 2, 3} \ {}
>>> {1, 2, 3} \ {42}
>>>
>>> If there was a record there because user started Flink in a middle of a
>>> stream, the result is still undefined (the join case that you mentioned),
>>> since Flink could skip or ingest extra any number of messages (deletes or
>>> not).
>>>
>>> Piotrek
>>>
>>>

Reply via email to