[ https://issues.apache.org/jira/browse/FLINK-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17065296#comment-17065296 ]
jinhai commented on FLINK-16662: -------------------------------- similar issue: https://issues.apache.org/jira/browse/FLINK-16585 > Blink Planner failed to generate JobGraph for POJO DataStream converting to > Table (Cannot determine simple type name) > --------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-16662 > URL: https://issues.apache.org/jira/browse/FLINK-16662 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.10.0 > Reporter: chenxyz > Priority: Major > > When using Blink Palnner to convert a POJO DataStream to a Table, Blink will > generate and compile the SourceConversion$1 code. If the Jar task is > submitted to Flink, since the UserCodeClassLoader is not used when generating > the JobGraph, the ClassLoader(AppClassLoader) of the compiled code cannot > load the POJO class in the Jar package, so the following error will be > reported: > > {code:java} > Caused by: org.codehaus.commons.compiler.CompileException: Line 27, Column > 174: Cannot determine simple type name "net"Caused by: > org.codehaus.commons.compiler.CompileException: Line 27, Column 174: Cannot > determine simple type name "net" at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at > org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) > at > org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at > org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7009) at > org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:215) at > org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6425) at > org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6403) at > org.codehaus.janino.Java$Cast.accept(Java.java:4887) at > org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at > org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at > org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9150) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) at > org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) at > org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at > org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at > org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at > org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at > org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at > org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at > org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at > org.codehaus.janino.Java$Cast.accept(Java.java:4887) at > org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at > org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at > org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at > org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at > org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at > org.codehaus.janino.Java$Cast.accept(Java.java:4887) at > org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at > org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at > org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at > org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at > org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at > org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at > org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at > org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) > ... 20 more > // generate class > /* 1 */ > /* 2 */ public class SourceConversion$1 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */ private final Object[] references; > /* 6 */ private transient > org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter > converter$0; > /* 7 */ private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */ public SourceConversion$1( > /* 10 */ Object[] references, > /* 11 */ org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */ org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */ org.apache.flink.streaming.api.operators.Output output) > throws Exception { > /* 14 */ this.references = references; > /* 15 */ converter$0 = > (((org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter) > references[0])); > /* 16 */ this.setup(task, config, output); > /* 17 */ } > /* 18 */ > /* 19 */ @Override > /* 20 */ public void open() throws Exception { > /* 21 */ super.open(); > /* 22 */ > /* 23 */ } > /* 24 */ > /* 25 */ @Override > /* 26 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 27 */ org.apache.flink.table.dataformat.BaseRow in1 = > (org.apache.flink.table.dataformat.BaseRow) > (org.apache.flink.table.dataformat.BaseRow) > converter$0.toInternal((net.xxxxxxxxxx.Student) element.getValue()); > /* 28 */ > /* 29 */ > /* 30 */ > /* 31 */ output.collect(outElement.replace(in1)); > /* 32 */ } > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ @Override > /* 37 */ public void close() throws Exception { > /* 38 */ super.close(); > /* 39 */ > /* 40 */ } > /* 41 */ > /* 42 */ > /* 43 */ } > /* 44 */ {code} > I think like generating Pipeline (StreamGraph), UserCodeClassLoader should be > used when generating JobGraph. > The test code is as follows: > > {code:java} > public class App { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings envSet = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > envSet); > env.enableCheckpointing(2 * 60 * 1000); > TableConfig config = tableEnv.getConfig(); > config.setIdleStateRetentionTime(Time.hours(24), > Time.hours(25)); > DataStreamSource<Student> source = env.addSource(new > SourceFunction<Student>() { > @Override > public void run(SourceContext<Student> ctx) throws Exception { > ctx.collect(new Student(1, "Tom")); > } > @Override > public void cancel() { > } > }); > tableEnv.createTemporaryView("student", source, "id, name"); > Table table = tableEnv.sqlQuery("select id, name from student"); > CsvTableSink sink = new CsvTableSink("/data/student", ",", 10, > FileSystem.WriteMode.OVERWRITE); > String[] fieldNames = {"id", "name"}; > TypeInformation[] fieldTypes = {Types.INT, Types.STRING}; > tableEnv.registerTableSink("student_sink", fieldNames, fieldTypes, > sink); > table.insertInto("student_sink"); > env.execute("Test_Jar"); > } > @Getter > @Setter > @NoArgsConstructor > @AllArgsConstructor > public static class Student { > private Integer id; > private String name; > } > }{code} > To reproduce this bug, the following conditions must be met: > 1. Convert POJO DataStream to Table > 2. Enables Checkpoint, StreamingJobGraphGenerator#preValidate() will check > whether Checkpoint is enabled > 3. The program is packaged into a Jar and submitted to Flink, or invoke > PackagedProgramUtils.createJobGraph to create JobGraph by the Jar Program > directly -- This message was sent by Atlassian Jira (v8.3.4#803005)