Hi Hequn,

That's great! Yes, let's go with option 2 (from the source's point of view)
and later extend Join and Aggregation to discard empty deletes.
I agree that the filtering at the sink should be optional and configurable
via the query configuration.

Again, thanks for starting this discussion. I think it helped us all to get
a better understanding of how upserts work.

Best, Fabian

Am Mi., 29. Aug. 2018 um 17:29 Uhr schrieb Hequn Cheng <chenghe...@gmail.com
>:

> Hi Fabian
>
> Thanks for your update. The opinions on upsert streams are highly
> enlightening. I think now I am agree with you that we can choose option 2
> to solve the problem: Throw away empty deletes when source generate
> retractions, otherwise pass empty deletes down.
>
> As for the UpsertSink, I think we don't need to filter empty deletes in it,
> unless the external system should not receive empty deletes. It would be
> good to provide an optional parameter in the StreamQueryConfig to indicate
> whether filter empty deletes in upsert sinks(i.e., this is a job
> configuration). In this way, we can also solve the Filter
> problems(FLINK-9528). I will create another subtask about UpsertSink later.
>
> Thanks again for all the suggestions. It really helps me a lot.
> Best, Hequn.
>
>
> On Tue, Aug 28, 2018 at 9:47 PM Fabian Hueske <fhue...@apache.org> wrote:
>
> > Hi Hequn, hi Piotr,
> >
> > Thanks for pushing this discussion forward and sorry for not responding
> > earlier.
> >
> > After reading the thread, I agree that we do not *need* to (but may)
> > forward empty deletes.
> > As I pointed out before, I see empty deletes not as a semantic problem
> > that needs to be exactly solved but rather as a performance problem that
> > can be optimized (trade-off state costs vs. handling empty delete).
> >
> > Piotr raised a good point. An upsert delete message may consist only of
> > the key fields and the delete flag.
> > Having no data but the keys means we *cannot* handle them as regular
> > records during a join, filter, projection, or aggregation.
> >
> > Upsert streams are only present, if the stream is received by an operator
> > that is able to correctly interpret the upsert messages.
> > Right now, there is only the UpsertSink operator that can handle upsert
> > streams. Join and Aggregation might to support upsert inputs in the
> future
> > as well.
> > There are the following cases:
> >
> > 1. If there is no operator, that can handle an upsert stream, upserts
> need
> > to be converted into retractions.
> > Filtering out empty deletes while converting to retractions comes for
> free.
> > 2. If the receiving operator is a Join or Aggregation, it has all
> > necessary state to check whether the delete is empty or not.
> > In case of an empty delete, it is simply dropped.
> >
> > In both cases (retract stream conversion and stateful upsert operator) we
> > can filter empty deletes for free.
> >
> > The only case left are UpsertSinks. These do not have built-in state,
> > since it is maintained in an external system.
> > As I said before, empty deletes are not a semantic problem. We could
> > forward all empty deletes and the result would still be consistent.
> > However, I understand that empty deletes can cause severe a performance
> > issues.
> > We can address the performance issue with different measures such as
> > best-effort (approximate) filtering or exact state-backed filtering.
> >
> > I think in many cases we can handle empty deletes from upsert sources
> > without adding additional state.
> > As soon as the upsert messages are converted into retraction messages or
> > consumed by a join or aggregation, they can be filtered for free.
> > We only need to add state, if we have an upsert sink AND if that sink
> > wants to remove all empty deletes.
> >
> > There is one more thing that needs to be discussed. How upsert messages
> > are handled by Calc operators.
> > A Calc (projection and/or filter) that receives (and produces) an upsert
> > stream (because it is in front of a Join, Aggregation, UpsertSink) should
> > handle messages as follows:
> > - upsert message/flag=true: upsert messages are handled as regular
> > message. If the predicate evaluates to false, all but the key fields are
> > set to null and the message is forwarded as a delete message
> > - delete message/flag=false: delete messages are converted to the output
> > schema (padded with nulls) and forwarded.
> >
> > What do you think,
> > Fabian
> >
> >
> >
> >
> >
> >
> > Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng <
> > chenghe...@gmail.com>:
> >
> >> Hi Piotrek,
> >>
> >> Great to see your replies, and really thanks for all your suggestions.
> >> Inline is a good way, i will do it same as you :-)
> >>
> >> *> I’m assuming that we are talking about event time and that `(delete
> 1,
> >> a, 1)` happened before `(add 1, a, 2)`, right?*
> >>
> >> We are talking about processing time(FLINK-8577
> >> <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the
> >> next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578
> >).
> >> And `(delete 1, a, 1)` is an empty delete message comes before `(add 1,
> a,
> >> 2)`. However, (add   2, a, 1) & (add   2, a, 2) may come before or
> after (delete
> >> 1, a, 1) &(add 1, a, 2).
> >>
> >> *> Maybe we should say, that’s unspecified behaviour and Flink can
> either
> >> forward empty deletes or filter them out depending on what’s more
> >> efficient/easier. In the end this whole topic is very fishy. Look at it
> >> from the Flink’s perspective. User is trying to remove the data, that
> was
> >> never there in the first place. If it wasn’t there, why should we
> >> retract/produce deletes for future records/join results? Should Flink
> >> “imagine” that there was a record that it hasn’t seen? Should we apply
> this
> >> logic to other records that Flink “hasn’t seen”?*
> >>
> >> I don't think unspecified behavior is a good way. Yes, from the Flink's
> >> perspective, I think we should not retract/produce for future
> records/join
> >> results if it wasn't there, and this behavior should be consistent among
> >> all operators including upsert source. We should not imagine that there
> was
> >> a record that it hasn't seen, thus throw it away from the source.
> >>
> >> *> Another potential issue. Previously I assumed that upsert deletes do
> >> not have to carry the data that they are removing, just delete flag and
> >> primary key. If that’s the case, if there is an aggregation after the
> join,
> >> how could we even handle empty delete? We wouldn’t be able to produce
> valid
> >> retract message for the follow up aggregation. *
> >>
> >> Yes, It is true that we can't handle empty deletes in flink. As I said,
> >> downstream operators even don't know how to deal with it.
> >>
> >> *> If item was an upsert stream with item_id as a primary key, the
> answer
> >> can only be 4. However I do not see any problem here, since aggregations
> >> require updates as retractions anyway, thus we can handle empty deletes
> in
> >> UpsertToRetractionsOperator anyway we wish - filtering them out here or
> >> marking them as “empty deletes” for forwarding. Marking as “empty
> deletes”
> >> would have to be handled by aggregation in special way - forward it,
> but do
> >> not update the state.*
> >>
> >> Agree, the answer should only be 4. As you are in favor of option 3, I
> >> think you are in favor of forwarding empty deletes in
> >> UpsertToRetractionsOperator since the operator is part of upsert
> >> source.(UpsertToRetractionsOperator is one way to implement upsert
> source,
> >> there are other ways.)
> >> And yes we can add a message type to distinguish the empty deletes from
> >> retractions. What I concern is we have to adapt all operators to the
> >> message type. What's more these empty deletes are useless for downstream
> >> operators(even for sink), because of the changes of key dimension.
> >>
> >> *> I see also different problems here. What if again, as I mentioned
> >> previously, delete message has no content? What if delete message has
> >> incorrect content, that doesn’t match to the latest update? Shouldn’t
> >> filter always pass delete message without looking into it’s content?*
> >>
> >> No, Filter should not always pass delete messages without looking into
> >> it's content. For sql like:
> >>
> >>> insert into xxx
> >>> select sum(item_v) from item where item_v < 2 and item_id = 1
> >>
> >> what's the result of sum? -1? -2? PS, the changelog is:
> >>
> >>> add 1                   (item_v=1, item_id = 2)   be filtered
> >>> retract delete 1    (item_v=1, item_id = 2)   no filtered if always
> pass
> >>> delete
> >>> add 2                   (item_v=2, item_id = 2)   be filtered
> >>> empty delete 1    (item_v=1, item_id = 1)   no filtered if always pass
> >>> delete
> >>> add 2                  (item_v=2, item_id = 1)   be filtered
> >>
> >>
> >> *> Again, I fail to see semantics problem here. The data was not there
> in
> >> the first place, so whether we forward delete or not shouldn’t matter
> from
> >> semantics perspective: end result is still the same. If it does matter,
> >> user should let Flink ingest the stream from some valid/consistent
> point,
> >> not in the middle of it. For me, the only valid concern regarding empty
> >> deletes is performance perspective.*
> >>
> >> The semantics problem here is: case 1 will pass empty deletes, but case
> 4
> >> won't, just because of adding a filter. Whether we forward delete or not
> >> the result is not the same. Users can perceive this, since data in his
> >> physical storage is different.
> >>
> >> *> What you are saying is highly use case specific. If you enforce full
> >> empty deletions removal in this case, you will end up with terrible
> >> performance in other cases/setups. I think configurable best effort
> cache
> >> is still the way to go for the long run. It could be configured with
> number
> >> of entries/bytes stored, where 0 means no-op, +INFINITY means
> full/perfect
> >> cache. However I’m not even sure how high priority it should get.*
> >>
> >> I think this is a normal use case for big data analysis, with diversity
> >> keys but focus on small part of it. To take a step back, we should not
> let
> >> users take this risk. I agree keeping a state in source will influence
> the
> >> performance, but I don't think it is terrible. We can even only store
> key
> >> in state when source doesn't generate retraction. The performance of the
> >> source is much better than the aggregations. Furthermore, there are
> other
> >> ways to improve the performance, for example adding a bloom filter.
> >>
> >> *> If we decide to go this way, why even bother with upsert streams and
> >> supporting them inside our pipeline? Why not convert everything to
> >> retractions in sources and work always on retractions? It would simplify
> >> our code and we wouldn’t need the logic from
> `DataStreamRetractionRules`.*
> >>
> >> I think these are two different things. One is how to handle empty
> >> deletes, the other is how to handle updates.
> >>
> >> *> If there was a record there because user started Flink in a middle of
> >> a stream, the result is still undefined (the join case that you
> mentioned),
> >> since Flink could skip or ingest extra any number of messages (deletes
> or
> >> not).*
> >>
> >> I think the result is clear if we clearly define that the upsert source
> >> ignore empty deletes. Flink only skip or ingest messages according to
> the
> >> sql/table-api provided by users.
> >>
> >> Thanks, Hequn
> >>
> >>
> >> On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <pi...@data-artisans.com
> >
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks for very detailed explanation :) Please check my inlined
> >>> responses.
> >>>
> >>> On 22 Aug 2018, at 14:28, Hequn Cheng <chenghe...@gmail.com> wrote:
> >>>
> >>> Hi all,
> >>> Thanks a lot for your replies, @Piotr Nowojski <
> pi...@data-artisans.com>
> >>>  @Fabian Hueske <fhue...@apache.org> @Xingcan Cui <xingc...@gmail.com>
> >>>
> >>> Option 3 will send empty deletes to downstream operators, but
> downstream
> >>> operators seems don't know how to deal with it.
> >>> Assume that upsert source forward deletes by default. Let's see what
> >>> problems we have.
> >>>
> >>> *Tables*
> >>> Suppose we have two upsert tables with key. One is category(key:
> >>> cate_id), the other is item(key: item_id).
> >>> -----category-----
> >>> cate_id,  cate_v
> >>> (delete  a, 1)
> >>>
> >>> -----item-----
> >>> item_id, cate_id, item_v
> >>> (add   2, a, 1)
> >>> (add   2, a, 2)
> >>> (delete 1, a, 1)
> >>> (add 1, a, 2)
> >>>
> >>>
> >>> I’m assuming that we are talking about event time and that `(delete 1,
> >>> a, 1)` happened before `(add 1, a, 2)`, right?
> >>>
> >>> *Cases*
> >>> case 2: join
> >>>
> >>>> insert into xxx
> >>>> select * from item join category on item.cate_id = category.cate_id;
> >>>
> >>> Same to case1, would user want to delete all items of category 'a'? For
> >>> record (delete  a, 1) from category, should it be filtered by join or
> >>> be joined with all items from item table?
> >>> If be filtered, it is semantically conflict with case 1.
> >>> If not be filtered, it means we have to storage delete messages in
> state
> >>> of join since data from item table may come later to join. Also, how to
> >>> deal with these deletes in state is a problem. I think the logic of
> join
> >>> will be very complicated.
> >>>
> >>>
> >>> Maybe we should say, that’s unspecified behaviour and Flink can either
> >>> forward empty deletes or filter them out depending on what’s more
> >>> efficient/easier. In the end this whole topic is very fishy. Look at it
> >>> from the Flink’s perspective. User is trying to remove the data, that
> was
> >>> never there in the first place. If it wasn’t there, why should we
> >>> retract/produce deletes for future records/join results? Should Flink
> >>> “imagine” that there was a record that it hasn’t seen? Should we apply
> this
> >>> logic to other records that Flink “hasn’t seen”?
> >>>
> >>> Another potential issue. Previously I assumed that upsert deletes do
> not
> >>> have to carry the data that they are removing, just delete flag and
> primary
> >>> key. If that’s the case, if there is an aggregation after the join, how
> >>> could we even handle empty delete? We wouldn’t be able to produce valid
> >>> retract message for the follow up aggregation.
> >>>
> >>>
> >>> case 3: aggregate
> >>>
> >>>> insert into xxx
> >>>> select sum(item_v) from item
> >>>
> >>> What's the result of sum, 4 or 5?
> >>> If the answer is 4, how can the downstream group_by tell which delete
> is
> >>> an empty delete or a retract delete? PS: the changelog from item table
> to
> >>> group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We
> probably
> >>> can add a message type to solve the problem, but it definitely will
> bring
> >>> large changes and I don't think it is time to do it.
> >>> If the answer is 3, the result is definitely wrong.
> >>>
> >>>
> >>> If item was an upsert stream with item_id as a primary key, the answer
> >>> can only be 4. However I do not see any problem here, since
> aggregations
> >>> require updates as retractions anyway, thus we can handle empty
> deletes in
> >>> UpsertToRetractionsOperator anyway we wish - filtering them out here or
> >>> marking them as “empty deletes” for forwarding. Marking as “empty
> deletes”
> >>> would have to be handled by aggregation in special way - forward it,
> but do
> >>> not update the state.
> >>>
> >>>
> >>> case 4: Filter
> >>>
> >>>> insert into xxx
> >>>> select item_id, item_v from item where item_v < 2 and item_id = 1
> >>>
> >>>
> >>> I see also different problems here. What if again, as I mentioned
> >>> previously, delete message has no content? What if delete message has
> >>> incorrect content, that doesn’t match to the latest update? Shouldn’t
> >>> filter always pass delete message without looking into it’s content?
> >>>
> >>> For item of item_id=1, if Filter also generates empty deletes, these
> two
> >>> kinds of deletes can not be distinguished by the sink. The problem is:
> >>> should we filter empty deletes in sink?
> >>> If yes, it is semantically conflicting with case 1.
> >>> If no, deletes generated by Filter can not be filtered, this can cause
> >>> devastating pressure on the physical database.
> >>>
> >>>
> >>> Again, I fail to see semantics problem here. The data was not there in
> >>> the first place, so whether we forward delete or not shouldn’t matter
> from
> >>> semantics perspective: end result is still the same. If it does matter,
> >>> user should let Flink ingest the stream from some valid/consistent
> point,
> >>> not in the middle of it. For me, the only valid concern regarding empty
> >>> deletes is performance perspective.
> >>>
> >>> Furthermore, I don't think we can use best effort. Considering the
> >>> anti-spamming scenario, users probably just want to get top 1% data
> from
> >>> the result, the rest 99% of the data will all become delete messages
> after
> >>> the Filter. This would be a disaster for a storage. Especially for the
> >>> case, most of the coming keys are new ones and can not be swallowed by
> >>> a cache with best effort. We can not release a version of flink that
> take a
> >>> chance to bring a disaster to our user, even the change is small.
> >>>
> >>>
> >>> What you are saying is highly use case specific. If you enforce full
> >>> empty deletions removal in this case, you will end up with terrible
> >>> performance in other cases/setups. I think configurable best effort
> cache
> >>> is still the way to go for the long run. It could be configured with
> number
> >>> of entries/bytes stored, where 0 means no-op, +INFINITY means
> full/perfect
> >>> cache. However I’m not even sure how high priority it should get.
> >>>
> >>> Thus, I don't think we should allow empty deletes output from source.
> >>>
> >>>
> >>> If we decide to go this way, why even bother with upsert streams and
> >>> supporting them inside our pipeline? Why not convert everything to
> >>> retractions in sources and work always on retractions? It would
> simplify
> >>> our code and we wouldn’t need the logic from
> `DataStreamRetractionRules`.
> >>>
> >>> To sum up, I really fail to see any semantic problems here. Regardless
> >>> if we forward/filter out empty deletes the end result is always the
> same:
> >>> there was no record there in the first place. In set theory following
> >>> operations give the same outcome:
> >>>
> >>> {1, 2, 3} \ {4}
> >>> {1, 2, 3} \ {}
> >>> {1, 2, 3} \ {42}
> >>>
> >>> If there was a record there because user started Flink in a middle of a
> >>> stream, the result is still undefined (the join case that you
> mentioned),
> >>> since Flink could skip or ingest extra any number of messages (deletes
> or
> >>> not).
> >>>
> >>> Piotrek
> >>>
> >>>
>

Reply via email to