Could you show how your UDF `ts2Date` is implemented? sunfulin <sunfulin0...@163.com> 于2020年3月1日周日 下午6:05写道:
> Hi, Benchao, > Thanks for the reply. > > Could you provide us more information? > 1. what planner are you using? blink or legacy planner? > I am using Blink Planner. Not test with legacy planner because my program > depend a lot of new feature based on blink planner. > 2. how do you register your UDF? > Just use the code : tableEnv.registerFunction ("ts2Date", new > ts2Date()); tableEnv is a StreamTableEnvironment. > 3. does this has a relation with checkpointing? what if you enable > checkpointing and not use your udf? and disable checkpointing and use udf? > I don't think this is related with checkpoint. If I enable checkpointing > and not use my udf, I did not see any exception and submit job > successfully. If I disable checkpointing and use udf, the job can submit > successfully too. > > I dive a lot with this exception. Maybe it is related with some > classloader issue. Hope for your suggestion. > > > > > > 在 2020-03-01 17:54:03,"Benchao Li" <libenc...@gmail.com> 写道: > > Hi fulin, > > It seems like a bug in the code generation. > > Could you provide us more information? > 1. what planner are you using? blink or legacy planner? > 2. how do you register your UDF? > 3. does this has a relation with checkpointing? what if you enable > checkpointing and not use your udf? and disable checkpointing and use udf? > > sunfulin <sunfulin0...@163.com> 于2020年3月1日周日 下午5:41写道: > >> Hi, guys >> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. >> In my sql logic, I am using a UDF like ts2Date to handle date format stream >> fields. However, when I add the `env.enableCheckpointing(time)`, my job >> failed to submit and throws exception like following. This is really weird, >> cause when I remove the UDF, the job can submit successfully. Any >> suggestion is highly appreciated. Besides, my sql logic is like : >> >> *INSERT INTO *realtime_product_sell >> *select *U.sor_pty_id, >> U.entrust_date, >> U.entrust_time, >> U.product_code, >> U.business_type, >> sum(*cast*(U.balance *as double*)) *as *balance, >> COALESCE(C.cust_name, *'--'*) *as *cust_name, >> COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name, >> COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id, >> COALESCE(C.org_name,*'--'*) *as *org_name, >> COALESCE(C.org_id,*'--'*) *as *comp_name, >> COALESCE(C.comp_name, *'--'*) *AS *comp_name, >> COALESCE(C.comp_id,*'--'*) *as *comp_id, >> COALESCE(C.mng_name,*'--'*) *as *mng_name, >> COALESCE(C.mng_id,*'--'*) *as *mng_id, >> COALESCE(C.is_tg,*'--'*) *as *is_tg, >> COALESCE(C.cust_type,*'--'*) *as *cust_type, >> COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365, >> COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y >> >> *from*(*select *customerNumber *as *sor_pty_id, >> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the >> UDF >> ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the >> UDF >> fundCode *as *product_code, >> businessType *as *business_type, >> balance, >> proctime >> *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*, >> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U >> >> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS * >> C >> *on **U*.sor_pty_id = *C*.cust_id >> *group by *sor_pty_id, >> entrust_date, >> entrust_time, >> product_code, >> business_type, >> COALESCE(C.cust_name, *'--'*), >> COALESCE(C.open_comp_name, *'--'*), >> COALESCE(C.open_comp_id, *'--'*), >> COALESCE(C.org_name,*'--'*), >> COALESCE(C.org_id,*'--'*), >> COALESCE(C.comp_name, *'--'*), >> COALESCE(C.comp_id,*'--'*), >> COALESCE(C.mng_name,*'--'*), >> COALESCE(C.mng_id,*'--'*), >> COALESCE(C.is_tg,*'--'*), >> COALESCE(C.cust_type,*'--'*), >> COALESCE(C.avg_tot_aset_y365, 0.00), >> COALESCE(C.avg_aset_create_y, 0.00) >> >> 2020-03-01 17:22:06,504 ERROR >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled >> exception. >> org.apache.flink.util.FlinkRuntimeException: >> org.apache.flink.api.common.InvalidProgramException: Table program cannot >> be compiled. This is a bug. Please file an issue. >> at >> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) >> at >> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) >> at >> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96) >> at >> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149) >> at >> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104) >> at >> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777) >> at >> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) >> at >> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) >> at >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57) >> at >> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128) >> at >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138) >> at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: >> org.apache.flink.api.common.InvalidProgramException: Table program cannot >> be compiled. This is a bug. Please file an issue. >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) >> at >> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) >> ... 16 more >> Caused by: org.apache.flink.api.common.InvalidProgramException: Table >> program cannot be compiled. This is a bug. Please file an issue. >> at >> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) >> at >> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) >> at >> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) >> ... 19 more >> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, >> Column 30: Cannot determine simple type name "com" >> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) >> at >> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) >> at >> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) >> at >> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) >> at >> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) >> at >> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) >> at >> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) >> at >> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) >> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) >> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) >> at >> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) >> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) >> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) >> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) >> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) >> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) >> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) >> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184) >> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786) >> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412) >> at >> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407) >> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299) >> at >> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) >> at >> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) >> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) >> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) >> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) >> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) >> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) >> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768) >> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410) >> at >> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407) >> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213) >> at >> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) >> at >> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) >> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) >> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) >> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) >> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) >> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) >> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) >> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) >> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) >> at >> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) >> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) >> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) >> at >> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) >> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) >> at >> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) >> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) >> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) >> at >> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) >> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) >> at >> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) >> at >> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) >> at >> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) >> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) >> at >> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) >> at >> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) >> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) >> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) >> at >> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) >> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) >> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) >> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) >> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) >> at >> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) >> ... 25 more >> >> >> >> >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn