I try to make the question-model simple, such as the code below:




```

  val s = env.fromCollection(List(

    ("Book", 1, "<How to be a Flink Committer>")

  ))

  tableEnv.registerDataStream("tableA", s, 'a, 'b, 'c)

  class TestFunction extends ScalarFunction {

    def eval(data: String) = {

      println(s"test: ${data}")

      data

    }

  }

  tableEnv.registerFunction("my_test", new TestFunction)

  val tableB = tableEnv.sqlQuery(

    """

      |SELECT Row(A, C) as body FROM (

      | SELECT my_test(a) as A, my_test(c) as C from tableA

      |)

      |""".stripMargin)

  tableB.printSchema()

  tableEnv.registerTable("tableB", tableB)

  tableEnv.sqlQuery(

    """

      |SELECT body.EXPR$0, body.EXPR$1

      |FROM tableB

      |""".stripMargin).toAppendStream[Row].print()

```




the type of column `body` is Row, and it comes from tableA, part of the plan is 




Calc(select=[CAST((my_test(a) ROW my_test(c))).EXPR$0 AS EXPR$0, 
CAST((my_test(a) ROW my_test(c))).EXPR$1 AS EXPR$1])




the column `body` will be "generated" twice.




In my real case, the column `body` has many columns, and if the sql try to 
SELECT body.EXPR$0, body.EXPR$1, ..body.EXPR$n, then the plan come bigger, and 
job failed.




Maybe this is the reason?

And Is there any way to make `body` generated only one times?




Thanks for your reply.







At 2020-04-23 20:32:07, "Caizhi Weng" <tsreape...@gmail.com> wrote:

This plan looks indeed complicated, however it is hard to see what the SQL is 
doing as the plan is too long... Could you provide your SQL to us? Also, what 
version of Flink are you using? It seems that there is a very long method in 
the generated code, but Flink should have split it into many shorter methods 
(see TableConfig#maxGeneratedCodeLength). By default Flink will split methods 
longer than 64KB into shorter ones.


izual <izual...@163.com> 于2020年4月23日周四 下午6:34写道:

Hi,Community:
  I add 4 complicated sqls in one job, and the job looks running well.
  But when I try to add 5th sql,the job failed at the beginning。
  And throws errors info below:
java.lang.RuntimeException: Could not instantiate generated class 
'StreamExecCalc$23166'
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:47)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:428)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:373)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
... 10 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap.newNode(HashMap.java:1750)
at java.util.HashMap.putVal(HashMap.java:642)
at java.util.HashMap.putMapEntries(HashMap.java:515)
at java.util.HashMap.putAll(HashMap.java:785)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3658)
at org.codehaus.janino.UnitCompiler.access$5800(UnitCompiler.java:215)
at 
org.codehaus.janino.UnitCompiler$12.visitLocalVariableDeclarationStatement(UnitCompiler.java:3543)
at 
org.codehaus.janino.UnitCompiler$12.visitLocalVariableDeclarationStatement(UnitCompiler.java:3511)
at 
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3510)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3499)


As the warning shows OOM,Then I try to set -yjm -ytm to a big value(1024 -> 
4096),but this does not help.


Thanks for your reply.




 

Reply via email to