Could you also provide us the DDL for lscsp_sc_order_all
and dim_app_cust_info ?

sunfulin <sunfulin0...@163.com> 于2020年3月1日周日 下午9:22写道:

>
> *CREATE TABLE **realtime_product_sell *(
>   sor_pty_id *varchar*,
>   entrust_date *varchar*,
>   entrust_time *varchar*,
>   product_code *varchar *,
>   business_type *varchar *,
>   balance *double *,
>   cust_name *varchar *,
>   open_comp_name *varchar *,
>   open_comp_id *varchar *,
>   org_name *varchar *,
>   org_id *varchar *,
>   comp_name *varchar *,
>   comp_id *varchar *,
>   mng_name *varchar *,
>   mng_id *varchar *,
>   is_tg *varchar *,
>   cust_type *varchar *,
>   avg_tot_aset_y365 *double *,
>   avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'elasticsearch'*,
> *'connector.version' *= *'<version>'*,
> *'connector.hosts' *= *'<host_port>'*,
> *'connector.index' *= *'realtime_product_sell_007118'*,
> *'connector.document-type' *= *'_doc'*,
> *'update-mode' *= *'upsert'*,
> *'connector.key-delimiter' *= *'$'*,
> *'connector.key-null-literal' *= *'n/a'*,
> *'connector.bulk-flush.interval' *= *'1000'*,
> *'format.type' *=
> *'json'*)
>
>
>
>
>
> At 2020-03-01 21:08:08, "Benchao Li" <libenc...@gmail.com> wrote:
> >The UDF looks good. Could you also paste your DDL? Then we can produce your
> >bug easily.
> >
> >sunfulin <sunfulin0...@163.com> 于2020年3月1日周日 下午6:39写道:
> >
> >> Below is the code. The function trans origin field timeStr "2020-03-01
> >> 12:01:00.234" to target timeStr accroding to dayTag.
> >>
> >> *public class *ts2Date *extends *ScalarFunction {
> >>     *public *ts2Date() {
> >>
> >>     }
> >>
> >>
> >>     *public *String eval (String timeStr, *boolean *dayTag) {
> >>
> >>     *if*(timeStr == *null*) {
> >>         *return null*;
> >>     }
> >>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
> >> HH:mm:ss.SSS"*);
> >>     Date date = *new *Date();
> >>     *try *{
> >>         date = ortSf.parse(timeStr);
> >>     } *catch *(ParseException e) {
> >>         e.printStackTrace();
> >>         *return null*;
> >>     }
> >>     *if *(dayTag) {
> >>         String format = *"yyyy-MM-dd"*;
> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >>         *return *sf.format(date);
> >>     } *else *{
> >>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >>         *return *sf.format(date);
> >>     }
> >> }
> >> }
> >>
> >>
> >>
> >> At 2020-03-01 18:14:30, "Benchao Li" <libenc...@gmail.com> wrote:
> >>
> >> Could you show how your UDF `ts2Date` is implemented?
> >>
> >> sunfulin <sunfulin0...@163.com> 于2020年3月1日周日 下午6:05写道:
> >>
> >>> Hi, Benchao,
> >>> Thanks for the reply.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> I am using Blink Planner. Not test with legacy planner because my program
> >>> depend a lot of new feature based on blink planner.
> >>> 2. how do you register your UDF?
> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> >>> ts2Date());    tableEnv is a StreamTableEnvironment.
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>> I don't think this is related with checkpoint. If I enable checkpointing
> >>> and not use my udf, I did not see any exception and submit job
> >>> successfully. If I disable checkpointing and use udf, the job can submit
> >>> successfully too.
> >>>
> >>> I dive a lot with this exception. Maybe it is related with some
> >>> classloader issue. Hope for your suggestion.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-03-01 17:54:03,"Benchao Li" <libenc...@gmail.com> 写道:
> >>>
> >>> Hi fulin,
> >>>
> >>> It seems like a bug in the code generation.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> 2. how do you register your UDF?
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>>
> >>> sunfulin <sunfulin0...@163.com> 于2020年3月1日周日 下午5:41写道:
> >>>
> >>>> Hi, guys
> >>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
> >>>> In my sql logic, I am using a UDF like ts2Date to handle date format 
> >>>> stream
> >>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
> >>>> failed to submit and throws exception like following. This is really 
> >>>> weird,
> >>>> cause when I remove the UDF, the job can submit successfully. Any
> >>>> suggestion is highly appreciated. Besides, my sql logic is like :
> >>>>
> >>>> *INSERT INTO *realtime_product_sell
> >>>> *select *U.sor_pty_id,
> >>>>        U.entrust_date,
> >>>>        U.entrust_time,
> >>>>        U.product_code,
> >>>>        U.business_type,
> >>>>        sum(*cast*(U.balance *as double*)) *as *balance,
> >>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
> >>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
> >>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
> >>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
> >>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
> >>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
> >>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
> >>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
> >>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
> >>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
> >>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
> >>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
> >>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
> >>>>
> >>>> *from*(*select *customerNumber *as *sor_pty_id,
> >>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
> >>>> UDF
> >>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
> >>>> UDF
> >>>>         fundCode *as *product_code,
> >>>>         businessType *as *business_type,
> >>>>         balance,
> >>>>         proctime
> >>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
> >>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
> >>>>
> >>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
> >>>> *C
> >>>> *on **U*.sor_pty_id = *C*.cust_id
> >>>> *group by *sor_pty_id,
> >>>>         entrust_date,
> >>>>         entrust_time,
> >>>>         product_code,
> >>>>         business_type,
> >>>>         COALESCE(C.cust_name, *'--'*),
> >>>>         COALESCE(C.open_comp_name, *'--'*),
> >>>>         COALESCE(C.open_comp_id, *'--'*),
> >>>>         COALESCE(C.org_name,*'--'*),
> >>>>         COALESCE(C.org_id,*'--'*),
> >>>>         COALESCE(C.comp_name, *'--'*),
> >>>>         COALESCE(C.comp_id,*'--'*),
> >>>>         COALESCE(C.mng_name,*'--'*),
> >>>>         COALESCE(C.mng_id,*'--'*),
> >>>>         COALESCE(C.is_tg,*'--'*),
> >>>>         COALESCE(C.cust_type,*'--'*),
> >>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
> >>>>         COALESCE(C.avg_aset_create_y, 0.00)
> >>>>
> >>>> 2020-03-01 17:22:06,504 ERROR
> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
> >>>> exception.
> >>>> org.apache.flink.util.FlinkRuntimeException:
> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >>>> be compiled. This is a bug. Please file an issue.
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> >>>> at
> >>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> >>>> at
> >>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> >>>> at
> >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> >>>> at
> >>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> >>>> at
> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> >>>> at
> >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> >>>> at
> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >>>> at
> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >>>> at java.lang.Thread.run(Thread.java:748)
> >>>> Caused by:
> >>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >>>> be compiled. This is a bug. Please file an issue.
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> >>>> ... 16 more
> >>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> >>>> program cannot be compiled. This is a bug. Please file an issue.
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> >>>> ... 19 more
> >>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
> >>>> Column 30: Cannot determine simple type name "com"
> >>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> >>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> >>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
> >>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
> >>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
> >>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
> >>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
> >>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> >>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> >>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> >>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> >>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> >>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> >>>> at
> >>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> >>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> >>>> at
> >>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> >>>> ... 25 more
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Benchao Li
> >>> School of Electronics Engineering and Computer Science, Peking University
> >>> Tel:+86-15650713730
> >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >> --
> >>
> >> Benchao Li
> >> School of Electronics Engineering and Computer Science, Peking University
> >> Tel:+86-15650713730
> >> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >>
> >>
> >>
> >>
> >>
> >
> >
> >--
> >
> >Benchao Li
> >School of Electronics Engineering and Computer Science, Peking University
> >Tel:+86-15650713730
> >Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Reply via email to