Hi Hequn, That's great! Yes, let's go with option 2 (from the source's point of view) and later extend Join and Aggregation to discard empty deletes. I agree that the filtering at the sink should be optional and configurable via the query configuration.
Again, thanks for starting this discussion. I think it helped us all to get a better understanding of how upserts work. Best, Fabian Am Mi., 29. Aug. 2018 um 17:29 Uhr schrieb Hequn Cheng <chenghe...@gmail.com >: > Hi Fabian > > Thanks for your update. The opinions on upsert streams are highly > enlightening. I think now I am agree with you that we can choose option 2 > to solve the problem: Throw away empty deletes when source generate > retractions, otherwise pass empty deletes down. > > As for the UpsertSink, I think we don't need to filter empty deletes in it, > unless the external system should not receive empty deletes. It would be > good to provide an optional parameter in the StreamQueryConfig to indicate > whether filter empty deletes in upsert sinks(i.e., this is a job > configuration). In this way, we can also solve the Filter > problems(FLINK-9528). I will create another subtask about UpsertSink later. > > Thanks again for all the suggestions. It really helps me a lot. > Best, Hequn. > > > On Tue, Aug 28, 2018 at 9:47 PM Fabian Hueske <fhue...@apache.org> wrote: > > > Hi Hequn, hi Piotr, > > > > Thanks for pushing this discussion forward and sorry for not responding > > earlier. > > > > After reading the thread, I agree that we do not *need* to (but may) > > forward empty deletes. > > As I pointed out before, I see empty deletes not as a semantic problem > > that needs to be exactly solved but rather as a performance problem that > > can be optimized (trade-off state costs vs. handling empty delete). > > > > Piotr raised a good point. An upsert delete message may consist only of > > the key fields and the delete flag. > > Having no data but the keys means we *cannot* handle them as regular > > records during a join, filter, projection, or aggregation. > > > > Upsert streams are only present, if the stream is received by an operator > > that is able to correctly interpret the upsert messages. > > Right now, there is only the UpsertSink operator that can handle upsert > > streams. Join and Aggregation might to support upsert inputs in the > future > > as well. > > There are the following cases: > > > > 1. If there is no operator, that can handle an upsert stream, upserts > need > > to be converted into retractions. > > Filtering out empty deletes while converting to retractions comes for > free. > > 2. If the receiving operator is a Join or Aggregation, it has all > > necessary state to check whether the delete is empty or not. > > In case of an empty delete, it is simply dropped. > > > > In both cases (retract stream conversion and stateful upsert operator) we > > can filter empty deletes for free. > > > > The only case left are UpsertSinks. These do not have built-in state, > > since it is maintained in an external system. > > As I said before, empty deletes are not a semantic problem. We could > > forward all empty deletes and the result would still be consistent. > > However, I understand that empty deletes can cause severe a performance > > issues. > > We can address the performance issue with different measures such as > > best-effort (approximate) filtering or exact state-backed filtering. > > > > I think in many cases we can handle empty deletes from upsert sources > > without adding additional state. > > As soon as the upsert messages are converted into retraction messages or > > consumed by a join or aggregation, they can be filtered for free. > > We only need to add state, if we have an upsert sink AND if that sink > > wants to remove all empty deletes. > > > > There is one more thing that needs to be discussed. How upsert messages > > are handled by Calc operators. > > A Calc (projection and/or filter) that receives (and produces) an upsert > > stream (because it is in front of a Join, Aggregation, UpsertSink) should > > handle messages as follows: > > - upsert message/flag=true: upsert messages are handled as regular > > message. If the predicate evaluates to false, all but the key fields are > > set to null and the message is forwarded as a delete message > > - delete message/flag=false: delete messages are converted to the output > > schema (padded with nulls) and forwarded. > > > > What do you think, > > Fabian > > > > > > > > > > > > > > Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng < > > chenghe...@gmail.com>: > > > >> Hi Piotrek, > >> > >> Great to see your replies, and really thanks for all your suggestions. > >> Inline is a good way, i will do it same as you :-) > >> > >> *> I’m assuming that we are talking about event time and that `(delete > 1, > >> a, 1)` happened before `(add 1, a, 2)`, right?* > >> > >> We are talking about processing time(FLINK-8577 > >> <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the > >> next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578 > >). > >> And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, > a, > >> 2)`. However, (add 2, a, 1) & (add 2, a, 2) may come before or > after (delete > >> 1, a, 1) &(add 1, a, 2). > >> > >> *> Maybe we should say, that’s unspecified behaviour and Flink can > either > >> forward empty deletes or filter them out depending on what’s more > >> efficient/easier. In the end this whole topic is very fishy. Look at it > >> from the Flink’s perspective. User is trying to remove the data, that > was > >> never there in the first place. If it wasn’t there, why should we > >> retract/produce deletes for future records/join results? Should Flink > >> “imagine” that there was a record that it hasn’t seen? Should we apply > this > >> logic to other records that Flink “hasn’t seen”?* > >> > >> I don't think unspecified behavior is a good way. Yes, from the Flink's > >> perspective, I think we should not retract/produce for future > records/join > >> results if it wasn't there, and this behavior should be consistent among > >> all operators including upsert source. We should not imagine that there > was > >> a record that it hasn't seen, thus throw it away from the source. > >> > >> *> Another potential issue. Previously I assumed that upsert deletes do > >> not have to carry the data that they are removing, just delete flag and > >> primary key. If that’s the case, if there is an aggregation after the > join, > >> how could we even handle empty delete? We wouldn’t be able to produce > valid > >> retract message for the follow up aggregation. * > >> > >> Yes, It is true that we can't handle empty deletes in flink. As I said, > >> downstream operators even don't know how to deal with it. > >> > >> *> If item was an upsert stream with item_id as a primary key, the > answer > >> can only be 4. However I do not see any problem here, since aggregations > >> require updates as retractions anyway, thus we can handle empty deletes > in > >> UpsertToRetractionsOperator anyway we wish - filtering them out here or > >> marking them as “empty deletes” for forwarding. Marking as “empty > deletes” > >> would have to be handled by aggregation in special way - forward it, > but do > >> not update the state.* > >> > >> Agree, the answer should only be 4. As you are in favor of option 3, I > >> think you are in favor of forwarding empty deletes in > >> UpsertToRetractionsOperator since the operator is part of upsert > >> source.(UpsertToRetractionsOperator is one way to implement upsert > source, > >> there are other ways.) > >> And yes we can add a message type to distinguish the empty deletes from > >> retractions. What I concern is we have to adapt all operators to the > >> message type. What's more these empty deletes are useless for downstream > >> operators(even for sink), because of the changes of key dimension. > >> > >> *> I see also different problems here. What if again, as I mentioned > >> previously, delete message has no content? What if delete message has > >> incorrect content, that doesn’t match to the latest update? Shouldn’t > >> filter always pass delete message without looking into it’s content?* > >> > >> No, Filter should not always pass delete messages without looking into > >> it's content. For sql like: > >> > >>> insert into xxx > >>> select sum(item_v) from item where item_v < 2 and item_id = 1 > >> > >> what's the result of sum? -1? -2? PS, the changelog is: > >> > >>> add 1 (item_v=1, item_id = 2) be filtered > >>> retract delete 1 (item_v=1, item_id = 2) no filtered if always > pass > >>> delete > >>> add 2 (item_v=2, item_id = 2) be filtered > >>> empty delete 1 (item_v=1, item_id = 1) no filtered if always pass > >>> delete > >>> add 2 (item_v=2, item_id = 1) be filtered > >> > >> > >> *> Again, I fail to see semantics problem here. The data was not there > in > >> the first place, so whether we forward delete or not shouldn’t matter > from > >> semantics perspective: end result is still the same. If it does matter, > >> user should let Flink ingest the stream from some valid/consistent > point, > >> not in the middle of it. For me, the only valid concern regarding empty > >> deletes is performance perspective.* > >> > >> The semantics problem here is: case 1 will pass empty deletes, but case > 4 > >> won't, just because of adding a filter. Whether we forward delete or not > >> the result is not the same. Users can perceive this, since data in his > >> physical storage is different. > >> > >> *> What you are saying is highly use case specific. If you enforce full > >> empty deletions removal in this case, you will end up with terrible > >> performance in other cases/setups. I think configurable best effort > cache > >> is still the way to go for the long run. It could be configured with > number > >> of entries/bytes stored, where 0 means no-op, +INFINITY means > full/perfect > >> cache. However I’m not even sure how high priority it should get.* > >> > >> I think this is a normal use case for big data analysis, with diversity > >> keys but focus on small part of it. To take a step back, we should not > let > >> users take this risk. I agree keeping a state in source will influence > the > >> performance, but I don't think it is terrible. We can even only store > key > >> in state when source doesn't generate retraction. The performance of the > >> source is much better than the aggregations. Furthermore, there are > other > >> ways to improve the performance, for example adding a bloom filter. > >> > >> *> If we decide to go this way, why even bother with upsert streams and > >> supporting them inside our pipeline? Why not convert everything to > >> retractions in sources and work always on retractions? It would simplify > >> our code and we wouldn’t need the logic from > `DataStreamRetractionRules`.* > >> > >> I think these are two different things. One is how to handle empty > >> deletes, the other is how to handle updates. > >> > >> *> If there was a record there because user started Flink in a middle of > >> a stream, the result is still undefined (the join case that you > mentioned), > >> since Flink could skip or ingest extra any number of messages (deletes > or > >> not).* > >> > >> I think the result is clear if we clearly define that the upsert source > >> ignore empty deletes. Flink only skip or ingest messages according to > the > >> sql/table-api provided by users. > >> > >> Thanks, Hequn > >> > >> > >> On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <pi...@data-artisans.com > > > >> wrote: > >> > >>> Hi, > >>> > >>> Thanks for very detailed explanation :) Please check my inlined > >>> responses. > >>> > >>> On 22 Aug 2018, at 14:28, Hequn Cheng <chenghe...@gmail.com> wrote: > >>> > >>> Hi all, > >>> Thanks a lot for your replies, @Piotr Nowojski < > pi...@data-artisans.com> > >>> @Fabian Hueske <fhue...@apache.org> @Xingcan Cui <xingc...@gmail.com> > >>> > >>> Option 3 will send empty deletes to downstream operators, but > downstream > >>> operators seems don't know how to deal with it. > >>> Assume that upsert source forward deletes by default. Let's see what > >>> problems we have. > >>> > >>> *Tables* > >>> Suppose we have two upsert tables with key. One is category(key: > >>> cate_id), the other is item(key: item_id). > >>> -----category----- > >>> cate_id, cate_v > >>> (delete a, 1) > >>> > >>> -----item----- > >>> item_id, cate_id, item_v > >>> (add 2, a, 1) > >>> (add 2, a, 2) > >>> (delete 1, a, 1) > >>> (add 1, a, 2) > >>> > >>> > >>> I’m assuming that we are talking about event time and that `(delete 1, > >>> a, 1)` happened before `(add 1, a, 2)`, right? > >>> > >>> *Cases* > >>> case 2: join > >>> > >>>> insert into xxx > >>>> select * from item join category on item.cate_id = category.cate_id; > >>> > >>> Same to case1, would user want to delete all items of category 'a'? For > >>> record (delete a, 1) from category, should it be filtered by join or > >>> be joined with all items from item table? > >>> If be filtered, it is semantically conflict with case 1. > >>> If not be filtered, it means we have to storage delete messages in > state > >>> of join since data from item table may come later to join. Also, how to > >>> deal with these deletes in state is a problem. I think the logic of > join > >>> will be very complicated. > >>> > >>> > >>> Maybe we should say, that’s unspecified behaviour and Flink can either > >>> forward empty deletes or filter them out depending on what’s more > >>> efficient/easier. In the end this whole topic is very fishy. Look at it > >>> from the Flink’s perspective. User is trying to remove the data, that > was > >>> never there in the first place. If it wasn’t there, why should we > >>> retract/produce deletes for future records/join results? Should Flink > >>> “imagine” that there was a record that it hasn’t seen? Should we apply > this > >>> logic to other records that Flink “hasn’t seen”? > >>> > >>> Another potential issue. Previously I assumed that upsert deletes do > not > >>> have to carry the data that they are removing, just delete flag and > primary > >>> key. If that’s the case, if there is an aggregation after the join, how > >>> could we even handle empty delete? We wouldn’t be able to produce valid > >>> retract message for the follow up aggregation. > >>> > >>> > >>> case 3: aggregate > >>> > >>>> insert into xxx > >>>> select sum(item_v) from item > >>> > >>> What's the result of sum, 4 or 5? > >>> If the answer is 4, how can the downstream group_by tell which delete > is > >>> an empty delete or a retract delete? PS: the changelog from item table > to > >>> group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We > probably > >>> can add a message type to solve the problem, but it definitely will > bring > >>> large changes and I don't think it is time to do it. > >>> If the answer is 3, the result is definitely wrong. > >>> > >>> > >>> If item was an upsert stream with item_id as a primary key, the answer > >>> can only be 4. However I do not see any problem here, since > aggregations > >>> require updates as retractions anyway, thus we can handle empty > deletes in > >>> UpsertToRetractionsOperator anyway we wish - filtering them out here or > >>> marking them as “empty deletes” for forwarding. Marking as “empty > deletes” > >>> would have to be handled by aggregation in special way - forward it, > but do > >>> not update the state. > >>> > >>> > >>> case 4: Filter > >>> > >>>> insert into xxx > >>>> select item_id, item_v from item where item_v < 2 and item_id = 1 > >>> > >>> > >>> I see also different problems here. What if again, as I mentioned > >>> previously, delete message has no content? What if delete message has > >>> incorrect content, that doesn’t match to the latest update? Shouldn’t > >>> filter always pass delete message without looking into it’s content? > >>> > >>> For item of item_id=1, if Filter also generates empty deletes, these > two > >>> kinds of deletes can not be distinguished by the sink. The problem is: > >>> should we filter empty deletes in sink? > >>> If yes, it is semantically conflicting with case 1. > >>> If no, deletes generated by Filter can not be filtered, this can cause > >>> devastating pressure on the physical database. > >>> > >>> > >>> Again, I fail to see semantics problem here. The data was not there in > >>> the first place, so whether we forward delete or not shouldn’t matter > from > >>> semantics perspective: end result is still the same. If it does matter, > >>> user should let Flink ingest the stream from some valid/consistent > point, > >>> not in the middle of it. For me, the only valid concern regarding empty > >>> deletes is performance perspective. > >>> > >>> Furthermore, I don't think we can use best effort. Considering the > >>> anti-spamming scenario, users probably just want to get top 1% data > from > >>> the result, the rest 99% of the data will all become delete messages > after > >>> the Filter. This would be a disaster for a storage. Especially for the > >>> case, most of the coming keys are new ones and can not be swallowed by > >>> a cache with best effort. We can not release a version of flink that > take a > >>> chance to bring a disaster to our user, even the change is small. > >>> > >>> > >>> What you are saying is highly use case specific. If you enforce full > >>> empty deletions removal in this case, you will end up with terrible > >>> performance in other cases/setups. I think configurable best effort > cache > >>> is still the way to go for the long run. It could be configured with > number > >>> of entries/bytes stored, where 0 means no-op, +INFINITY means > full/perfect > >>> cache. However I’m not even sure how high priority it should get. > >>> > >>> Thus, I don't think we should allow empty deletes output from source. > >>> > >>> > >>> If we decide to go this way, why even bother with upsert streams and > >>> supporting them inside our pipeline? Why not convert everything to > >>> retractions in sources and work always on retractions? It would > simplify > >>> our code and we wouldn’t need the logic from > `DataStreamRetractionRules`. > >>> > >>> To sum up, I really fail to see any semantic problems here. Regardless > >>> if we forward/filter out empty deletes the end result is always the > same: > >>> there was no record there in the first place. In set theory following > >>> operations give the same outcome: > >>> > >>> {1, 2, 3} \ {4} > >>> {1, 2, 3} \ {} > >>> {1, 2, 3} \ {42} > >>> > >>> If there was a record there because user started Flink in a middle of a > >>> stream, the result is still undefined (the join case that you > mentioned), > >>> since Flink could skip or ingest extra any number of messages (deletes > or > >>> not). > >>> > >>> Piotrek > >>> > >>> >