PrabhuJoseph commented on PR #4506:
URL: https://github.com/apache/zeppelin/pull/4506#issuecomment-1350801311

   @zjffdu @zenozeng Thanks for this patch, this is very useful to us.
   
   We are facing 
[FLINK-13645](https://issues.apache.org/jira/browse/FLINK-13645) again after 
Flink-1.16 upgrade. It is working fine on Flink-1.15. Looks 
[FLINK-15635](https://issues.apache.org/jira/browse/FLINK-15635) is causing the 
regression.
   Do you know how to fix this issue. I have used FLINK-13645 fix which picks 
java.net.URLClassLoader but does not help.
   
   **Repro: Flink1.16 (runs on YARN) + With this Zeppelin Patch**
   
   ```
   %flink.bsql
   
   DROP TABLE IF EXISTS bank_raw;
   CREATE TABLE bank_raw (
      content STRING
   ) WITH (
   'format.field-delimiter'='\n',
   'connector.type'='filesystem',
   'format.derive-schema'='true',
   'connector.path'='/tmp/bank.csv',
   'format.type'='csv'
   );
   
   DROP TABLE IF EXISTS bank;
   CREATE TABLE bank (
       age int, 
       job string,
       marital string,
       education string,
       `default` string,
       balance string,
       housing string,
       loan string,
       contact string, 
       `day` string,
       `month` string,
       duration int,
       campaign int,
       pdays int,
       previous int,
       poutcome string,
       y string
   ) WITH (
   'format.field-delimiter'=',',
   'connector.type'='filesystem',
   'format.derive-schema'='true',
   'connector.path'='/tmp/bank_cleaned',
   'format.type'='csv'
   );
   
   %flink
   
   import org.apache.flink.api.java.typeutils.RowTypeInfo
   import org.apache.flink.api.common.typeinfo.Types
   import org.apache.flink.api.java.typeutils._
   import org.apache.flink.api.scala.typeutils._
   import org.apache.flink.api.scala._
   
   class Person(val age:Int, val job: String, val marital: String, val 
education: String, val default: String, val balance: String, val housing: 
String, val loan: String, val contact: String, val day: String, val month: 
String, val duration: Int, val campaign: Int, val pdays: Int, val previous: 
Int, val poutcome: String, val y: String)
   
   class ParseFunction extends TableFunction[Row] {
     def eval(line: String) {
       val tokens = line.split(";")
       // parse the line
       if (!line.startsWith("\"age\"")) {
         collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
              new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16))))
       }
     }
     
     override def getResultType() = {
       val cls = classOf[Person]
       Types.ROW(Types.INT, Types.STRING, Types.STRING, 
Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING,
       Types.INT,  Types.INT, Types.INT, Types.INT, Types.STRING, Types.STRING)
     }
   
     // remove the quote
     private def normalize(token: String) = {
         if (token.startsWith("\"")) {
             token.substring(1, token.length - 1)
         } else {
             token
         }
     }
   }
   
   btenv.registerFunction("parse", new ParseFunction())
   
   %flink.bsql
   
   insert into bank select T.* from bank_raw, LATERAL TABLE(parse(content)) as 
T(age, job,  marital, education, `default`, balance, housing, loan, contact, 
`day`, `month`, duration, campaign, pdays,  previous,  poutcome, y) 
   ```
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to