chenxyz created FLINK-16662:
-------------------------------

             Summary: Blink Planner failed to generate JobGraph for POJO 
DataStream converting to Table (Cannot determine simple type name)
                 Key: FLINK-16662
                 URL: https://issues.apache.org/jira/browse/FLINK-16662
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.10.0
            Reporter: chenxyz


When using Blink Palnner to convert a POJO DataStream to a Table, Blink will 
generate and compile the SourceConversion$1 code. If the Jar task is submitted 
to Flink, since the UserCodeClassLoader is not used when generating the 
JobGraph, the ClassLoader(AppClassLoader) of the compiled code cannot load the 
POJO class in the Jar package, so the following error will be reported:

 
{code:java}
Caused by: org.codehaus.commons.compiler.CompileException: Line 27, Column 174: 
Cannot determine simple type name "net"Caused by: 
org.codehaus.commons.compiler.CompileException: Line 27, Column 174: Cannot 
determine simple type name "net" at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at 
org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at 
org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
 at 
org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
 at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at 
org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at 
org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at 
org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at 
org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at 
org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7009) at 
org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:215) at 
org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6425) at 
org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6403) at 
org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at 
org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at 
org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at 
org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9150)
 at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) at 
org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) at 
org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at 
org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at 
org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at 
org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at 
org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at 
org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at 
org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at 
org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at 
org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at 
org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at 
org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at 
org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at 
org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
 at 
org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
 at 
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at 
org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) 
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) 
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at 
org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at 
org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
 at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
 at 
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) 
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at 
org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at 
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at 
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at 
org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at 
org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
 ... 20 more


// generate class
/* 1 */
/* 2 */      public class SourceConversion$1 extends 
org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
/* 3 */          implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
/* 4 */
/* 5 */        private final Object[] references;
/* 6 */        private transient 
org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter 
converter$0;
/* 7 */        private final 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
/* 8 */
/* 9 */        public SourceConversion$1(
/* 10 */            Object[] references,
/* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
/* 12 */            org.apache.flink.streaming.api.graph.StreamConfig config,
/* 13 */            org.apache.flink.streaming.api.operators.Output output) 
throws Exception {
/* 14 */          this.references = references;
/* 15 */          converter$0 = 
(((org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter) 
references[0]));
/* 16 */          this.setup(task, config, output);
/* 17 */        }
/* 18 */
/* 19 */        @Override
/* 20 */        public void open() throws Exception {
/* 21 */          super.open();
/* 22 */          
/* 23 */        }
/* 24 */
/* 25 */        @Override
/* 26 */        public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
/* 27 */          org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$0.toInternal((net.xxxxxxxxxx.Student) element.getValue());
/* 28 */          
/* 29 */          
/* 30 */          
/* 31 */          output.collect(outElement.replace(in1));
/* 32 */        }
/* 33 */
/* 34 */        
/* 35 */
/* 36 */        @Override
/* 37 */        public void close() throws Exception {
/* 38 */           super.close();
/* 39 */          
/* 40 */        }
/* 41 */
/* 42 */        
/* 43 */      }
/* 44 */    {code}

I think like generating Pipeline (StreamGraph), UserCodeClassLoader should be 
used when generating JobGraph.
The test code is as follows:

 
{code:java}
public class App {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSet = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
envSet);

        env.enableCheckpointing(2 * 60 * 1000);

        TableConfig config = tableEnv.getConfig();
        config.setIdleStateRetentionTime(Time.hours(24),
                Time.hours(25));

        DataStreamSource<Student> source = env.addSource(new 
SourceFunction<Student>() {
            @Override
            public void run(SourceContext<Student> ctx) throws Exception {
                ctx.collect(new Student(1, "Tom"));
            }

            @Override
            public void cancel() {

            }
        });

        tableEnv.createTemporaryView("student", source, "id, name");
        Table table = tableEnv.sqlQuery("select id, name from student");

        CsvTableSink sink = new CsvTableSink("/data/student", ",", 10, 
FileSystem.WriteMode.OVERWRITE);

        String[] fieldNames = {"id", "name"};
        TypeInformation[] fieldTypes = {Types.INT, Types.STRING};
        tableEnv.registerTableSink("student_sink", fieldNames, fieldTypes, 
sink);
        table.insertInto("student_sink");

        env.execute("Test_Jar");
    }

    @Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
    }
}{code}
To reproduce this bug, the following conditions must be met:

1. Convert POJO DataStream to Table
2. Enables Checkpoint, StreamingJobGraphGenerator#preValidate() will check 
whether Checkpoint is enabled
3. The program is packaged into a Jar and submitted to Flink, or invoke 
PackagedProgramUtils.createJobGraph to create JobGraph by the Jar Program 
directly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to