Leonard Xu created FLINK-20405:
----------------------------------

             Summary: The LAG function in over window is not implements 
correctly
                 Key: FLINK-20405
                 URL: https://issues.apache.org/jira/browse/FLINK-20405
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.12.0
            Reporter: Leonard Xu


For LAG(input, offset, default) function in over window, it always return 
current row's input no matter how the offset is set.

After see the codegen code of the function, I think the implementation is not 
correct and need to correct.
{code:java}

// the offset and default value is never used
public UnboundedOverAggregateHelper$24(java.lang.Object[] references) throws 
Exception {                                    constant$14 = ((int) 1);         
   constant$14isNull = false;                                                
constant$15 = ((org.apache.flink.table.data.binary.BinaryStringData) str$13);   
         constant$15isNull = false;                        typeSerializer$19 = 
(((org.apache.flink.table.runtime.typeutils.StringDataSerializer) 
references[0]));          }

public void accumulate(org.apache.flink.table.data.RowData accInput) throws 
Exception {                        
org.apache.flink.table.data.binary.BinaryStringData field$21;            
boolean isNull$21;            
org.apache.flink.table.data.binary.BinaryStringData field$22;            
isNull$21 = accInput.isNullAt(2);            field$21 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;            if 
(!isNull$21) {              field$21 = 
((org.apache.flink.table.data.binary.BinaryStringData) accInput.getString(2));  
          }            field$22 = field$21;            if (!isNull$21) {        
      field$22 = (org.apache.flink.table.data.binary.BinaryStringData) 
(typeSerializer$19.copy(field$22));            }                                
                if (agg0_leadlag != field$22) {              agg0_leadlag = 
((org.apache.flink.table.data.binary.BinaryStringData) 
typeSerializer$19.copy(field$22));            }                   ;            
agg0_leadlagIsNull = isNull$21;                                         }
{code}
 

The question comes from user mail list[1]

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkSQL-kafka-gt-dedup-gt-kafka-td39335.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to