Hi, Thanks for very detailed explanation :) Please check my inlined responses.
> On 22 Aug 2018, at 14:28, Hequn Cheng <chenghe...@gmail.com> wrote: > > Hi all, > Thanks a lot for your replies, @Piotr Nowojski > <mailto:pi...@data-artisans.com> @Fabian Hueske <mailto:fhue...@apache.org> > @Xingcan Cui <mailto:xingc...@gmail.com> > > Option 3 will send empty deletes to downstream operators, but downstream > operators seems don't know how to deal with it. > Assume that upsert source forward deletes by default. Let's see what problems > we have. > > Tables > Suppose we have two upsert tables with key. One is category(key: cate_id), > the other is item(key: item_id). > -----category----- > cate_id, cate_v > (delete a, 1) > > -----item----- > item_id, cate_id, item_v > (add 2, a, 1) > (add 2, a, 2) > (delete 1, a, 1) > (add 1, a, 2) I’m assuming that we are talking about event time and that `(delete 1, a, 1)` happened before `(add 1, a, 2)`, right? > Cases > case 2: join > insert into xxx > select * from item join category on item.cate_id = category.cate_id; > Same to case1, would user want to delete all items of category 'a'? For > record (delete a, 1) from category, should it be filtered by join or be > joined with all items from item table? > If be filtered, it is semantically conflict with case 1. > If not be filtered, it means we have to storage delete messages in state of > join since data from item table may come later to join. Also, how to deal > with these deletes in state is a problem. I think the logic of join will be > very complicated. Maybe we should say, that’s unspecified behaviour and Flink can either forward empty deletes or filter them out depending on what’s more efficient/easier. In the end this whole topic is very fishy. Look at it from the Flink’s perspective. User is trying to remove the data, that was never there in the first place. If it wasn’t there, why should we retract/produce deletes for future records/join results? Should Flink “imagine” that there was a record that it hasn’t seen? Should we apply this logic to other records that Flink “hasn’t seen”? Another potential issue. Previously I assumed that upsert deletes do not have to carry the data that they are removing, just delete flag and primary key. If that’s the case, if there is an aggregation after the join, how could we even handle empty delete? We wouldn’t be able to produce valid retract message for the follow up aggregation. > > case 3: aggregate > insert into xxx > select sum(item_v) from item > What's the result of sum, 4 or 5? > If the answer is 4, how can the downstream group_by tell which delete is an > empty delete or a retract delete? PS: the changelog from item table to > group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably > can add a message type to solve the problem, but it definitely will bring > large changes and I don't think it is time to do it. > If the answer is 3, the result is definitely wrong. If item was an upsert stream with item_id as a primary key, the answer can only be 4. However I do not see any problem here, since aggregations require updates as retractions anyway, thus we can handle empty deletes in UpsertToRetractionsOperator anyway we wish - filtering them out here or marking them as “empty deletes” for forwarding. Marking as “empty deletes” would have to be handled by aggregation in special way - forward it, but do not update the state. > > case 4: Filter > insert into xxx > select item_id, item_v from item where item_v < 2 and item_id = 1 I see also different problems here. What if again, as I mentioned previously, delete message has no content? What if delete message has incorrect content, that doesn’t match to the latest update? Shouldn’t filter always pass delete message without looking into it’s content? > For item of item_id=1, if Filter also generates empty deletes, these two > kinds of deletes can not be distinguished by the sink. The problem is: should > we filter empty deletes in sink? > If yes, it is semantically conflicting with case 1. > If no, deletes generated by Filter can not be filtered, this can cause > devastating pressure on the physical database. Again, I fail to see semantics problem here. The data was not there in the first place, so whether we forward delete or not shouldn’t matter from semantics perspective: end result is still the same. If it does matter, user should let Flink ingest the stream from some valid/consistent point, not in the middle of it. For me, the only valid concern regarding empty deletes is performance perspective. > Furthermore, I don't think we can use best effort. Considering the > anti-spamming scenario, users probably just want to get top 1% data from the > result, the rest 99% of the data will all become delete messages after the > Filter. This would be a disaster for a storage. Especially for the case, most > of the coming keys are new ones and can not be swallowed by a cache with best > effort. We can not release a version of flink that take a chance to bring a > disaster to our user, even the change is small. What you are saying is highly use case specific. If you enforce full empty deletions removal in this case, you will end up with terrible performance in other cases/setups. I think configurable best effort cache is still the way to go for the long run. It could be configured with number of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect cache. However I’m not even sure how high priority it should get. > Thus, I don't think we should allow empty deletes output from source. If we decide to go this way, why even bother with upsert streams and supporting them inside our pipeline? Why not convert everything to retractions in sources and work always on retractions? It would simplify our code and we wouldn’t need the logic from `DataStreamRetractionRules`. To sum up, I really fail to see any semantic problems here. Regardless if we forward/filter out empty deletes the end result is always the same: there was no record there in the first place. In set theory following operations give the same outcome: {1, 2, 3} \ {4} {1, 2, 3} \ {} {1, 2, 3} \ {42} If there was a record there because user started Flink in a middle of a stream, the result is still undefined (the join case that you mentioned), since Flink could skip or ingest extra any number of messages (deletes or not). Piotrek