Hi Piotrek, Great to see your replies, and really thanks for all your suggestions. Inline is a good way, i will do it same as you :-)
*> I’m assuming that we are talking about event time and that `(delete 1, a, 1)` happened before `(add 1, a, 2)`, right?* We are talking about processing time(FLINK-8577 <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578>). And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, a, 2)`. However, (add 2, a, 1) & (add 2, a, 2) may come before or after (delete 1, a, 1) &(add 1, a, 2). *> Maybe we should say, that’s unspecified behaviour and Flink can either forward empty deletes or filter them out depending on what’s more efficient/easier. In the end this whole topic is very fishy. Look at it from the Flink’s perspective. User is trying to remove the data, that was never there in the first place. If it wasn’t there, why should we retract/produce deletes for future records/join results? Should Flink “imagine” that there was a record that it hasn’t seen? Should we apply this logic to other records that Flink “hasn’t seen”?* I don't think unspecified behavior is a good way. Yes, from the Flink's perspective, I think we should not retract/produce for future records/join results if it wasn't there, and this behavior should be consistent among all operators including upsert source. We should not imagine that there was a record that it hasn't seen, thus throw it away from the source. *> Another potential issue. Previously I assumed that upsert deletes do not have to carry the data that they are removing, just delete flag and primary key. If that’s the case, if there is an aggregation after the join, how could we even handle empty delete? We wouldn’t be able to produce valid retract message for the follow up aggregation. * Yes, It is true that we can't handle empty deletes in flink. As I said, downstream operators even don't know how to deal with it. *> If item was an upsert stream with item_id as a primary key, the answer can only be 4. However I do not see any problem here, since aggregations require updates as retractions anyway, thus we can handle empty deletes in UpsertToRetractionsOperator anyway we wish - filtering them out here or marking them as “empty deletes” for forwarding. Marking as “empty deletes” would have to be handled by aggregation in special way - forward it, but do not update the state.* Agree, the answer should only be 4. As you are in favor of option 3, I think you are in favor of forwarding empty deletes in UpsertToRetractionsOperator since the operator is part of upsert source.(UpsertToRetractionsOperator is one way to implement upsert source, there are other ways.) And yes we can add a message type to distinguish the empty deletes from retractions. What I concern is we have to adapt all operators to the message type. What's more these empty deletes are useless for downstream operators(even for sink), because of the changes of key dimension. *> I see also different problems here. What if again, as I mentioned previously, delete message has no content? What if delete message has incorrect content, that doesn’t match to the latest update? Shouldn’t filter always pass delete message without looking into it’s content?* No, Filter should not always pass delete messages without looking into it's content. For sql like: > insert into xxx > select sum(item_v) from item where item_v < 2 and item_id = 1 what's the result of sum? -1? -2? PS, the changelog is: > add 1 (item_v=1, item_id = 2) be filtered > retract delete 1 (item_v=1, item_id = 2) no filtered if always pass > delete > add 2 (item_v=2, item_id = 2) be filtered > empty delete 1 (item_v=1, item_id = 1) no filtered if always pass > delete > add 2 (item_v=2, item_id = 1) be filtered *> Again, I fail to see semantics problem here. The data was not there in the first place, so whether we forward delete or not shouldn’t matter from semantics perspective: end result is still the same. If it does matter, user should let Flink ingest the stream from some valid/consistent point, not in the middle of it. For me, the only valid concern regarding empty deletes is performance perspective.* The semantics problem here is: case 1 will pass empty deletes, but case 4 won't, just because of adding a filter. Whether we forward delete or not the result is not the same. Users can perceive this, since data in his physical storage is different. *> What you are saying is highly use case specific. If you enforce full empty deletions removal in this case, you will end up with terrible performance in other cases/setups. I think configurable best effort cache is still the way to go for the long run. It could be configured with number of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect cache. However I’m not even sure how high priority it should get.* I think this is a normal use case for big data analysis, with diversity keys but focus on small part of it. To take a step back, we should not let users take this risk. I agree keeping a state in source will influence the performance, but I don't think it is terrible. We can even only store key in state when source doesn't generate retraction. The performance of the source is much better than the aggregations. Furthermore, there are other ways to improve the performance, for example adding a bloom filter. *> If we decide to go this way, why even bother with upsert streams and supporting them inside our pipeline? Why not convert everything to retractions in sources and work always on retractions? It would simplify our code and we wouldn’t need the logic from `DataStreamRetractionRules`.* I think these are two different things. One is how to handle empty deletes, the other is how to handle updates. *> If there was a record there because user started Flink in a middle of a stream, the result is still undefined (the join case that you mentioned), since Flink could skip or ingest extra any number of messages (deletes or not).* I think the result is clear if we clearly define that the upsert source ignore empty deletes. Flink only skip or ingest messages according to the sql/table-api provided by users. Thanks, Hequn On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > Thanks for very detailed explanation :) Please check my inlined responses. > > On 22 Aug 2018, at 14:28, Hequn Cheng <chenghe...@gmail.com> wrote: > > Hi all, > Thanks a lot for your replies, @Piotr Nowojski <pi...@data-artisans.com> > @Fabian > Hueske <fhue...@apache.org> @Xingcan Cui <xingc...@gmail.com> > > Option 3 will send empty deletes to downstream operators, but downstream > operators seems don't know how to deal with it. > Assume that upsert source forward deletes by default. Let's see what > problems we have. > > *Tables* > Suppose we have two upsert tables with key. One is category(key: cate_id), > the other is item(key: item_id). > -----category----- > cate_id, cate_v > (delete a, 1) > > -----item----- > item_id, cate_id, item_v > (add 2, a, 1) > (add 2, a, 2) > (delete 1, a, 1) > (add 1, a, 2) > > > I’m assuming that we are talking about event time and that `(delete 1, a, > 1)` happened before `(add 1, a, 2)`, right? > > *Cases* > case 2: join > >> insert into xxx >> select * from item join category on item.cate_id = category.cate_id; > > Same to case1, would user want to delete all items of category 'a'? For > record (delete a, 1) from category, should it be filtered by join or be > joined with all items from item table? > If be filtered, it is semantically conflict with case 1. > If not be filtered, it means we have to storage delete messages in state > of join since data from item table may come later to join. Also, how to > deal with these deletes in state is a problem. I think the logic of join > will be very complicated. > > > Maybe we should say, that’s unspecified behaviour and Flink can either > forward empty deletes or filter them out depending on what’s more > efficient/easier. In the end this whole topic is very fishy. Look at it > from the Flink’s perspective. User is trying to remove the data, that was > never there in the first place. If it wasn’t there, why should we > retract/produce deletes for future records/join results? Should Flink > “imagine” that there was a record that it hasn’t seen? Should we apply this > logic to other records that Flink “hasn’t seen”? > > Another potential issue. Previously I assumed that upsert deletes do not > have to carry the data that they are removing, just delete flag and primary > key. If that’s the case, if there is an aggregation after the join, how > could we even handle empty delete? We wouldn’t be able to produce valid > retract message for the follow up aggregation. > > > case 3: aggregate > >> insert into xxx >> select sum(item_v) from item > > What's the result of sum, 4 or 5? > If the answer is 4, how can the downstream group_by tell which delete is > an empty delete or a retract delete? PS: the changelog from item table to > group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably > can add a message type to solve the problem, but it definitely will bring > large changes and I don't think it is time to do it. > If the answer is 3, the result is definitely wrong. > > > If item was an upsert stream with item_id as a primary key, the answer can > only be 4. However I do not see any problem here, since aggregations > require updates as retractions anyway, thus we can handle empty deletes in > UpsertToRetractionsOperator anyway we wish - filtering them out here or > marking them as “empty deletes” for forwarding. Marking as “empty deletes” > would have to be handled by aggregation in special way - forward it, but do > not update the state. > > > case 4: Filter > >> insert into xxx >> select item_id, item_v from item where item_v < 2 and item_id = 1 > > > I see also different problems here. What if again, as I mentioned > previously, delete message has no content? What if delete message has > incorrect content, that doesn’t match to the latest update? Shouldn’t > filter always pass delete message without looking into it’s content? > > For item of item_id=1, if Filter also generates empty deletes, these two > kinds of deletes can not be distinguished by the sink. The problem is: > should we filter empty deletes in sink? > If yes, it is semantically conflicting with case 1. > If no, deletes generated by Filter can not be filtered, this can cause > devastating pressure on the physical database. > > > Again, I fail to see semantics problem here. The data was not there in the > first place, so whether we forward delete or not shouldn’t matter from > semantics perspective: end result is still the same. If it does matter, > user should let Flink ingest the stream from some valid/consistent point, > not in the middle of it. For me, the only valid concern regarding empty > deletes is performance perspective. > > Furthermore, I don't think we can use best effort. Considering the > anti-spamming scenario, users probably just want to get top 1% data from > the result, the rest 99% of the data will all become delete messages after > the Filter. This would be a disaster for a storage. Especially for the > case, most of the coming keys are new ones and can not be swallowed by a > cache with best effort. We can not release a version of flink that take a > chance to bring a disaster to our user, even the change is small. > > > What you are saying is highly use case specific. If you enforce full empty > deletions removal in this case, you will end up with terrible performance > in other cases/setups. I think configurable best effort cache is still the > way to go for the long run. It could be configured with number of > entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect > cache. However I’m not even sure how high priority it should get. > > Thus, I don't think we should allow empty deletes output from source. > > > If we decide to go this way, why even bother with upsert streams and > supporting them inside our pipeline? Why not convert everything to > retractions in sources and work always on retractions? It would simplify > our code and we wouldn’t need the logic from `DataStreamRetractionRules`. > > To sum up, I really fail to see any semantic problems here. Regardless if > we forward/filter out empty deletes the end result is always the same: > there was no record there in the first place. In set theory following > operations give the same outcome: > > {1, 2, 3} \ {4} > {1, 2, 3} \ {} > {1, 2, 3} \ {42} > > If there was a record there because user started Flink in a middle of a > stream, the result is still undefined (the join case that you mentioned), > since Flink could skip or ingest extra any number of messages (deletes or > not). > > Piotrek > >