Hi all,

Currently, I am working on FLINK-8577 Implement proctime DataStream to
Table upsert conversion <https://issues.apache.org/jira/browse/FLINK-8577>.
And a design doc can be found here
<https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing>.
It received many valuable suggestions. Many thanks to all of you.
However there are some problems I think may need more discussion.

*Terms*

   1. *Upsert Stream:* Stream that include a key definition and will be
   updated. Message types include insert, update and delete.
   2. *Upsert Source:* Source that ingest Upsert Stream.
   3. *Empty Delete:* For a specific key, the first message is a delete
   message.

*Problem to be discussed*
How to handle empty deletes for UpsertSource?

*Ways to solve the problem*

   1. Throw away empty delete messages in the UpsertSource(personally in
   favor of this option)
      - advantages
      - This makes sense in semantics. An empty table + delete message is
         still an empty table. Losing deletes does not affect the final results.
         - At present, the operators or functions in flink are assumed to
         process the add message first and then delete. Throw away
empty deletes in
         source, so that the downstream operators do not need to
consider the empty
         deletes.
         - disadvantages
      - Maintaining the state in source is expensive, especially for some
         simple sql like: UpsertSource -> Calc -> UpsertSink.
         2. Throw away empty delete messages when source generate
   retractions, otherwise pass empty delete messages down
      - advantages
      - Downstream operator does not need to consider empty delete messages
         when the source generates retraction.
         - Performance is better since source don't have to maintain state
         if it doesn't generate retractions.
         - disadvantages
      - The judgment that whether the downstream operator will receive
         empty delete messages is complicated. Not only take source into
         consideration, but also should consider the operators that
are followed by
         source. Take join as an example, for the sql: upsert_source
-> upsert_join,
         the join receives empty deletes while in sql(upsert_source ->
group_by ->
         upsert_join), the join doesn't since empty deletes are ingested by
         group_by.
         - The semantics of how to process empty deletes are not clear.
         Users may be difficult to understand, because sometimes empty
deletes are
         passed down, but sometimes don't.
         3. Pass empty delete messages down always
      - advantages
      - Performance is better since source don't have to maintain state if
         it doesn't generate retractions.
         - disadvantages
      - All table operators and functions in flink need to consider empty
         deletes.

*Another related problem*
Another related problem is FLINK-9528 Incorrect results: Filter does not
treat Upsert messages correctly
<https://issues.apache.org/jira/browse/FLINK-9528> which I think should be
considered together.
The problem in FLINK-9528 is, for sql like upsert_source -> filter ->
upsert_sink, when the data of a key changes from non-filtering to
filtering, the filter only removes the upsert message such that the
previous version remains in the result.

   1. One way to solve the problem is to make UpserSource generates
   retractions.
   2. Another way is to make a filter aware of the update semantics
   (retract or upsert) and convert the upsert message into a delete message if
   the predicate evaluates to false.

The second way will also generate many empty delete messages. To avoid too
many empty deletes, the solution is to maintain a filter state at sink to
prevent the empty deletes from causing devastating pressure on the physical
database. However, if UpsertSource can also output empty deletes, these
empty deletes will be more difficult to control. We don't know where these
deletes come from, and whether should be filtered out. The ambiguity of the
semantics of processing empty deletes makes the user unable to judge
whether there will be empty deletes output.

*My personal opinion*
>From my point of view, I think the first option(Throw away empty delete
messages in the UpsertSource) is the best, not only because the semantics
are more clear but also the processing logic of the entire table layer can
be more simple thus more efficient. Furthermore the performance loss is
acceptable (We can even only store key in state when source doesn't
generate retraction).

Any suggestions are greatly appreciated!

Best, Hequn

Reply via email to