[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973659#comment-15973659
 ] 

ASF GitHub Bot commented on FLINK-6091:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112060500
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
    @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView 
target) throws IOException {
                                fieldSerializers[i].serialize(o, target);
                        }
                }
    +           commandSerializer.serialize(record.command, target);
    --- End diff --
    
    I'm afraid we cannot change the serialization of `Row`. `Row` is a public 
class in `flink-core` and not an internal `flink-table` class. Hence, it is 
used at other places and might also be part of user applications. If we change 
the serialization, users might not be able to restore a job on 1.3 from a 
savepoint taken with 1.2. This restriction rules out to simply add a field to 
`Row` which would avoid major refactorings.
    
    I see two options to add the command field to the data streams in 
`flink-table`
    
    1. use a regular field in `Row`. This would mean that the physical layout 
of the `Row` is no longer the same as the logical layout, i.e., the one 
expected by Calcite. However, we will probably change this anyway for the 
upcoming changes related to the time indicators. For these, the physical layout 
will have fewer fields than the logical layout (we will remove time fields 
which are in the meta data of Flink's records or taken as processing time). By 
adding the command field, we would add a field which is not in the logical 
layout. The problem with this approach is that the command field would be at 
different positions in the Row (probably the last one). We could leverage the 
changes introduced by the time indicator changes (or the other way round). 
@twalthr is working on this. You can have a look at the current status here: 
https://github.com/twalthr/flink/tree/FLINK-5884
    2. The other option is to wrap the rows in a custom data type similar to a 
`Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and 
would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` 
which forward most calls to the type info, serializer, and comparator of `Row`. 
The problem with this approach is that we need to change the return types of 
all functions. For some functions this might not be a big issue if we can take 
the `Row` object before passing it to the code gen'd functions. The command 
field could be set when the result Row is returned or in a wrapping 
`Collector`. 
    
    My gut feeling is that the second approach is easier to implement because 
we (hopefully) do not need to touch the generated code and "just" need to wrap 
all `Row` objects in `CRow` objects.


> Implement and turn on the retraction for aggregates
> ---------------------------------------------------
>
>                 Key: FLINK-6091
>                 URL: https://issues.apache.org/jira/browse/FLINK-6091
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to