Hi, guys I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my sql logic, I am using a UDF like ts2Date to handle date format stream fields. However, when I add the `env.enableCheckpointing(time)`, my job failed to submit and throws exception like following. This is really weird, cause when I remove the UDF, the job can submit successfully. Any suggestion is highly appreciated. Besides, my sql logic is like :
INSERT INTO realtime_product_sell select U.sor_pty_id, U.entrust_date, U.entrust_time, U.product_code, U.business_type, sum(cast(U.balance as double)) as balance, COALESCE(C.cust_name, '--') as cust_name, COALESCE(C.open_comp_name, '--') AS open_comp_name, COALESCE(C.open_comp_id, '--') as open_comp_id, COALESCE(C.org_name,'--') as org_name, COALESCE(C.org_id,'--') as comp_name, COALESCE(C.comp_name, '--') AS comp_name, COALESCE(C.comp_id,'--') as comp_id, COALESCE(C.mng_name,'--') as mng_name, COALESCE(C.mng_id,'--') as mng_id, COALESCE(C.is_tg,'--') as is_tg, COALESCE(C.cust_type,'--') as cust_type, COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365, COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y from (select customerNumber as sor_pty_id, ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF fundCode as product_code, businessType as business_type, balance, proctime from lscsp_sc_order_all where fundCode in ('007118','007117') and businessType in ('5') ) as U left join dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C on U.sor_pty_id = C.cust_id group by sor_pty_id, entrust_date, entrust_time, product_code, business_type, COALESCE(C.cust_name, '--'), COALESCE(C.open_comp_name, '--'), COALESCE(C.open_comp_id, '--'), COALESCE(C.org_name,'--'), COALESCE(C.org_id,'--'), COALESCE(C.comp_name, '--'), COALESCE(C.comp_id,'--'), COALESCE(C.mng_name,'--'), COALESCE(C.mng_id,'--'), COALESCE(C.is_tg,'--'), COALESCE(C.cust_type,'--'), COALESCE(C.avg_tot_aset_y365, 0.00), COALESCE(C.avg_aset_create_y, 0.00) 2020-03-01 17:22:06,504 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104) at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777) at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) ... 16 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 19 more Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 30: Cannot determine simple type name "com" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786) at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412) at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407) at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768) at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407) at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) ... 25 more