[ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhuoluo Yang updated FLINK-6039:
--------------------------------
    Description: 
In actual world, especially while processing logs with TableFunction. The 
formats of the logs in actual world are flexible. Thus, the number of fields 
should not be fixed. 

For examples, we should make the three following types of of TableFunction work.
{code}
// Test for incomplete row
class TableFunc4 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(3)
        row.setField(0, s)  // And we only set values for one column
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO)
  }
}

// Test for incomplete row
class TableFunc5 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(1)  // ResultType is three columns, we have only one 
here
        row.setField(0, s)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO)
  }
}

// Test for overflow row
class TableFunc6 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(5)  // ResultType is two columns, we have five 
columns here
        row.setField(0, s)
        row.setField(1, s.length)
        row.setField(2, s.length)
        row.setField(3, s.length)
        row.setField(4, s.length)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO)
  }
}
{code}

Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
current version. This issue will make TableFunc5 works.

  was:
In actual world, especially while processing log with TableFunction. The format 
of log in actual world is flexible. Thus, the number of fields should not be 
fixed. 

For examples, we should make the three following types of of TableFunction 
works.
{code}
// Test for incomplete row
class TableFunc4 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(3)
        row.setField(0, s)  // And we only set values for one column
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO)
  }
}

// Test for incomplete row
class TableFunc5 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(1)  // ResultType is three columns, we have only one 
here
        row.setField(0, s)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO)
  }
}

// Test for overflow row
class TableFunc6 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(5)  // ResultType is two columns, we have five 
columns here
        row.setField(0, s)
        row.setField(1, s.length)
        row.setField(2, s.length)
        row.setField(3, s.length)
        row.setField(4, s.length)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO)
  }
}
{code}

Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
current version. This issue will make TableFunc5 works.


> Row of TableFunction should support flexible number of fields
> -------------------------------------------------------------
>
>                 Key: FLINK-6039
>                 URL: https://issues.apache.org/jira/browse/FLINK-6039
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Zhuoluo Yang
>            Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
>     if (str.contains("#")) {
>       str.split("#").foreach({ s =>
>         val row = new Row(3)
>         row.setField(0, s)  // And we only set values for one column
>         collect(row)
>       })
>     }
>   }
>   override def getResultType: TypeInformation[Row] = {
>     new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>                     BasicTypeInfo.INT_TYPE_INFO,
>                     BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
>     if (str.contains("#")) {
>       str.split("#").foreach({ s =>
>         val row = new Row(1)  // ResultType is three columns, we have only 
> one here
>         row.setField(0, s)
>         collect(row)
>       })
>     }
>   }
>   override def getResultType: TypeInformation[Row] = {
>     new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>       BasicTypeInfo.INT_TYPE_INFO,
>       BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
>     if (str.contains("#")) {
>       str.split("#").foreach({ s =>
>         val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
>         row.setField(0, s)
>         row.setField(1, s.length)
>         row.setField(2, s.length)
>         row.setField(3, s.length)
>         row.setField(4, s.length)
>         collect(row)
>       })
>     }
>   }
>   override def getResultType: TypeInformation[Row] = {
>     new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>                     BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to