PrabhuJoseph commented on PR #4506: URL: https://github.com/apache/zeppelin/pull/4506#issuecomment-1350801311
@zjffdu @zenozeng Thanks for this patch, this is very useful to us. We are facing [FLINK-13645](https://issues.apache.org/jira/browse/FLINK-13645) again after Flink-1.16 upgrade. It is working fine on Flink-1.15. Looks [FLINK-15635](https://issues.apache.org/jira/browse/FLINK-15635) is causing the regression. Do you know how to fix this issue. I have used FLINK-13645 fix which picks java.net.URLClassLoader but does not help. **Repro: Flink1.16 (runs on YARN) + With this Zeppelin Patch** ``` %flink.bsql DROP TABLE IF EXISTS bank_raw; CREATE TABLE bank_raw ( content STRING ) WITH ( 'format.field-delimiter'='\n', 'connector.type'='filesystem', 'format.derive-schema'='true', 'connector.path'='/tmp/bank.csv', 'format.type'='csv' ); DROP TABLE IF EXISTS bank; CREATE TABLE bank ( age int, job string, marital string, education string, `default` string, balance string, housing string, loan string, contact string, `day` string, `month` string, duration int, campaign int, pdays int, previous int, poutcome string, y string ) WITH ( 'format.field-delimiter'=',', 'connector.type'='filesystem', 'format.derive-schema'='true', 'connector.path'='/tmp/bank_cleaned', 'format.type'='csv' ); %flink import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils._ import org.apache.flink.api.scala._ class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String) class ParseFunction extends TableFunction[Row] { def eval(line: String) { val tokens = line.split(";") // parse the line if (!line.startsWith("\"age\"")) { collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt), new Integer(tokens(12).toInt), new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), normalize(tokens(15)), normalize(tokens(16)))) } } override def getResultType() = { val cls = classOf[Person] Types.ROW(Types.INT, Types.STRING, Types.STRING, Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING, Types.INT, Types.INT, Types.INT, Types.INT, Types.STRING, Types.STRING) } // remove the quote private def normalize(token: String) = { if (token.startsWith("\"")) { token.substring(1, token.length - 1) } else { token } } } btenv.registerFunction("parse", new ParseFunction()) %flink.bsql insert into bank select T.* from bank_raw, LATERAL TABLE(parse(content)) as T(age, job, marital, education, `default`, balance, housing, loan, contact, `day`, `month`, duration, campaign, pdays, previous, poutcome, y) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@zeppelin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org