Hi, Benchao, Thanks for the reply.
Could you provide us more information? 1. what planner are you using? blink or legacy planner? I am using Blink Planner. Not test with legacy planner because my program depend a lot of new feature based on blink planner. 2. how do you register your UDF? Just use the code : tableEnv.registerFunction ("ts2Date", new ts2Date()); tableEnv is a StreamTableEnvironment. 3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf? I don't think this is related with checkpoint. If I enable checkpointing and not use my udf, I did not see any exception and submit job successfully. If I disable checkpointing and use udf, the job can submit successfully too. I dive a lot with this exception. Maybe it is related with some classloader issue. Hope for your suggestion. 在 2020-03-01 17:54:03,"Benchao Li" <libenc...@gmail.com> 写道: Hi fulin, It seems like a bug in the code generation. Could you provide us more information? 1. what planner are you using? blink or legacy planner? 2. how do you register your UDF? 3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf? sunfulin <sunfulin0...@163.com> 于2020年3月1日周日 下午5:41写道: Hi, guys I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my sql logic, I am using a UDF like ts2Date to handle date format stream fields. However, when I add the `env.enableCheckpointing(time)`, my job failed to submit and throws exception like following. This is really weird, cause when I remove the UDF, the job can submit successfully. Any suggestion is highly appreciated. Besides, my sql logic is like : INSERT INTO realtime_product_sell select U.sor_pty_id, U.entrust_date, U.entrust_time, U.product_code, U.business_type, sum(cast(U.balance as double)) as balance, COALESCE(C.cust_name, '--') as cust_name, COALESCE(C.open_comp_name, '--') AS open_comp_name, COALESCE(C.open_comp_id, '--') as open_comp_id, COALESCE(C.org_name,'--') as org_name, COALESCE(C.org_id,'--') as comp_name, COALESCE(C.comp_name, '--') AS comp_name, COALESCE(C.comp_id,'--') as comp_id, COALESCE(C.mng_name,'--') as mng_name, COALESCE(C.mng_id,'--') as mng_id, COALESCE(C.is_tg,'--') as is_tg, COALESCE(C.cust_type,'--') as cust_type, COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365, COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y from (select customerNumber as sor_pty_id, ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF fundCode as product_code, businessType as business_type, balance, proctime from lscsp_sc_order_all where fundCode in ('007118','007117') and businessType in ('5') ) as U left join dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C on U.sor_pty_id = C.cust_id group by sor_pty_id, entrust_date, entrust_time, product_code, business_type, COALESCE(C.cust_name, '--'), COALESCE(C.open_comp_name, '--'), COALESCE(C.open_comp_id, '--'), COALESCE(C.org_name,'--'), COALESCE(C.org_id,'--'), COALESCE(C.comp_name, '--'), COALESCE(C.comp_id,'--'), COALESCE(C.mng_name,'--'), COALESCE(C.mng_id,'--'), COALESCE(C.is_tg,'--'), COALESCE(C.cust_type,'--'), COALESCE(C.avg_tot_aset_y365, 0.00), COALESCE(C.avg_aset_create_y, 0.00) 2020-03-01 17:22:06,504 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104) at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777) at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) ... 16 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 19 more Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 30: Cannot determine simple type name "com" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786) at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412) at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407) at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768) at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407) at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) ... 25 more -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn