Hi,

Thanks fort starting this discussion Hequn!
These are a tricky questions.

1) Handling empty deletes in UpsertSource.
I think forwarding empty deletes would only have a semantic difference if
the output is persisted in a non-empty external table, e.g., a Cassandra
table with entries.
If we would forward, the delete, we might remove data from the sink table.
This could be a desired effect.
Therefore, I think we should be able to forward empty deleted and filtering
them out could be a configurable option.

2) Handling upsert messages in Filters.
It think the problem is a bit better described with the following query
which writes to an upsert sink:

SELECT user, count(*) FROM clicks GROUP BY user HAVING count(*) < 10

As long as the count for a user is smaller than 10, an update is passed to
the upsert sink.
When the count reaches 10, the Filter would need to convert the update
message into a delete message (right now, the filter just removes the
message completely).
This would happen for every update of the count, i.e., the upsert sink
would need to handle many delete messages for data that is not longer in
the external storage (repeated deleted).
There are multiple ways to handle this issue:
* Make the Filter stateful and only convert the first message into a delete
and filter all subsequent updates. (Could also be a best effort cache,
LRU...)
* Make the UpsertSink opterator (optionally) stateful and track all deleted
entries. (Could also be a best effort cache, LRU...)
* For the (common?) special case of Aggregation -> Filter, we could offer a
dedicated operator that applies the filter within the aggregation.

---

So, we are dealing with two cases here:
1) Deleting what has not been ingested yet (although the result might be in
the external sink).
I would forward deletes and filter them optionally, i.e., approach 3 by
default and having approach 1 as an option

2) Deleting what has been deleted already.
I think having a best-effort cache in the Filter might be the best
approach. The GROUP-BY-HAVING operator might be a nice addition.
IMO, we should not give guarantees that an UpsertSink won't receive
repeated deletes.
If this is a problem for certain sink system, we could give an option for
an exact filter based on state.

What do you think?

Best, Fabian




2018-08-20 13:51 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:

> Hi,
>
> Thanks for bringing up this issue here.
>
> I’m not sure whether sometimes swallowing empty deletes could be a problem
> or always swallowing/forwarding them is better. I guess for most use cases
> it doesn't matter. Maybe the best for now would be to always forward them,
> since if they are a problem, user could handle them somehow, either in
> custom sink wrapper or in system that’s downstream from Flink. Also maybe
> we could have this configurable in the future.
>
> However this thing seems to me like a much lower priority compared to
> performance implications. Forcing upsert source to always keep all of the
> keys on the state is not only costly, but in many cases it can be a blocker
> from executing a query at all. Not only for the UpsertSource -> Calc ->
> UpsertSink, but also for example in the future for joins or ORDER BY
> (especially with LIMIT) as well.
>
> I would apply same reasoning to FLINK-9528.
>
> Piotrek
>
> > On 19 Aug 2018, at 08:21, Hequn Cheng <chenghe...@gmail.com> wrote:
> >
> > Hi all,
> >
> > Currently, I am working on FLINK-8577 Implement proctime DataStream to
> > Table upsert conversion <https://issues.apache.org/
> jira/browse/FLINK-8577>.
> > And a design doc can be found here
> > <https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVez
> aWe0y7Xqd0c1zE/edit?usp=sharing>.
> > It received many valuable suggestions. Many thanks to all of you.
> > However there are some problems I think may need more discussion.
> >
> > *Terms*
> >
> >   1. *Upsert Stream:* Stream that include a key definition and will be
> >   updated. Message types include insert, update and delete.
> >   2. *Upsert Source:* Source that ingest Upsert Stream.
> >   3. *Empty Delete:* For a specific key, the first message is a delete
> >   message.
> >
> > *Problem to be discussed*
> > How to handle empty deletes for UpsertSource?
> >
> > *Ways to solve the problem*
> >
> >   1. Throw away empty delete messages in the UpsertSource(personally in
> >   favor of this option)
> >      - advantages
> >      - This makes sense in semantics. An empty table + delete message is
> >         still an empty table. Losing deletes does not affect the final
> results.
> >         - At present, the operators or functions in flink are assumed to
> >         process the add message first and then delete. Throw away
> > empty deletes in
> >         source, so that the downstream operators do not need to
> > consider the empty
> >         deletes.
> >         - disadvantages
> >      - Maintaining the state in source is expensive, especially for some
> >         simple sql like: UpsertSource -> Calc -> UpsertSink.
> >         2. Throw away empty delete messages when source generate
> >   retractions, otherwise pass empty delete messages down
> >      - advantages
> >      - Downstream operator does not need to consider empty delete
> messages
> >         when the source generates retraction.
> >         - Performance is better since source don't have to maintain state
> >         if it doesn't generate retractions.
> >         - disadvantages
> >      - The judgment that whether the downstream operator will receive
> >         empty delete messages is complicated. Not only take source into
> >         consideration, but also should consider the operators that
> > are followed by
> >         source. Take join as an example, for the sql: upsert_source
> > -> upsert_join,
> >         the join receives empty deletes while in sql(upsert_source ->
> > group_by ->
> >         upsert_join), the join doesn't since empty deletes are ingested
> by
> >         group_by.
> >         - The semantics of how to process empty deletes are not clear.
> >         Users may be difficult to understand, because sometimes empty
> > deletes are
> >         passed down, but sometimes don't.
> >         3. Pass empty delete messages down always
> >      - advantages
> >      - Performance is better since source don't have to maintain state if
> >         it doesn't generate retractions.
> >         - disadvantages
> >      - All table operators and functions in flink need to consider empty
> >         deletes.
> >
> > *Another related problem*
> > Another related problem is FLINK-9528 Incorrect results: Filter does not
> > treat Upsert messages correctly
> > <https://issues.apache.org/jira/browse/FLINK-9528> which I think should
> be
> > considered together.
> > The problem in FLINK-9528 is, for sql like upsert_source -> filter ->
> > upsert_sink, when the data of a key changes from non-filtering to
> > filtering, the filter only removes the upsert message such that the
> > previous version remains in the result.
> >
> >   1. One way to solve the problem is to make UpserSource generates
> >   retractions.
> >   2. Another way is to make a filter aware of the update semantics
> >   (retract or upsert) and convert the upsert message into a delete
> message if
> >   the predicate evaluates to false.
> >
> > The second way will also generate many empty delete messages. To avoid
> too
> > many empty deletes, the solution is to maintain a filter state at sink to
> > prevent the empty deletes from causing devastating pressure on the
> physical
> > database. However, if UpsertSource can also output empty deletes, these
> > empty deletes will be more difficult to control. We don't know where
> these
> > deletes come from, and whether should be filtered out. The ambiguity of
> the
> > semantics of processing empty deletes makes the user unable to judge
> > whether there will be empty deletes output.
> >
> > *My personal opinion*
> > From my point of view, I think the first option(Throw away empty delete
> > messages in the UpsertSource) is the best, not only because the semantics
> > are more clear but also the processing logic of the entire table layer
> can
> > be more simple thus more efficient. Furthermore the performance loss is
> > acceptable (We can even only store key in state when source doesn't
> > generate retraction).
> >
> > Any suggestions are greatly appreciated!
> >
> > Best, Hequn
>
>

Reply via email to