CREATE TABLE realtime_product_sell (
sor_pty_id varchar,
entrust_date varchar,
entrust_time varchar,
product_code varchar ,
business_type varchar ,
balance double ,
cust_name varchar ,
open_comp_name varchar ,
open_comp_id varchar ,
org_name varchar ,
org_id varchar ,
comp_name varchar ,
comp_id varchar ,
mng_name varchar ,
mng_id varchar ,
is_tg varchar ,
cust_type varchar ,
avg_tot_aset_y365 double ,
avg_aset_create_y double
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '<version>',
'connector.hosts' = '<host_port>',
'connector.index' = 'realtime_product_sell_007118',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
At 2020-03-01 21:08:08, "Benchao Li" <> wrote:
>The UDF looks good. Could you also paste your DDL? Then we can produce your
>bug easily.
>sunfulin <> 于2020年3月1日周日 下午6:39写道:
>> Below is the code. The function trans origin field timeStr "2020-03-01
>> 12:01:00.234" to target timeStr accroding to dayTag.
>> *public class *ts2Date *extends *ScalarFunction {
>> *public *ts2Date() {
>> }
>> *public *String eval (String timeStr, *boolean *dayTag) {
>> *if*(timeStr == *null*) {
>> *return null*;
>> }
>> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
>> HH:mm:ss.SSS"*);
>> Date date = *new *Date();
>> *try *{
>> date = ortSf.parse(timeStr);
>> } *catch *(ParseException e) {
>> e.printStackTrace();
>> *return null*;
>> }
>> *if *(dayTag) {
>> String format = *"yyyy-MM-dd"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> } *else *{
>> String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> }
>> }
>> }
>> At 2020-03-01 18:14:30, "Benchao Li" <> wrote:
>> Could you show how your UDF `ts2Date` is implemented?
>> sunfulin <> 于2020年3月1日周日 下午6:05写道:
>>> Hi, Benchao,
>>> Thanks for the reply.
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> I am using Blink Planner. Not test with legacy planner because my program
>>> depend a lot of new feature based on blink planner.
>>> 2. how do you register your UDF?
>>> Just use the code : tableEnv.registerFunction ("ts2Date", new
>>> ts2Date()); tableEnv is a StreamTableEnvironment.
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>> I don't think this is related with checkpoint. If I enable checkpointing
>>> and not use my udf, I did not see any exception and submit job
>>> successfully. If I disable checkpointing and use udf, the job can submit
>>> successfully too.
>>> I dive a lot with this exception. Maybe it is related with some
>>> classloader issue. Hope for your suggestion.
>>> 在 2020-03-01 17:54:03,"Benchao Li" <> 写道:
>>> Hi fulin,
>>> It seems like a bug in the code generation.
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> 2. how do you register your UDF?
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>> sunfulin <> 于2020年3月1日周日 下午5:41写道:
>>>> Hi, guys
>>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>>>> failed to submit and throws exception like following. This is really weird,
>>>> cause when I remove the UDF, the job can submit successfully. Any
>>>> suggestion is highly appreciated. Besides, my sql logic is like :
>>>> *INSERT INTO *realtime_product_sell
>>>> *select *U.sor_pty_id,
>>>> U.entrust_date,
>>>> U.entrust_time,
>>>> U.product_code,
>>>> U.business_type,
>>>> sum(*cast*(U.balance *as double*)) *as *balance,
>>>> COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>>> COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>>> COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>>> COALESCE(C.org_name,*'--'*) *as *org_name,
>>>> COALESCE(C.org_id,*'--'*) *as *comp_name,
>>>> COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>>> COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>>> COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>>> COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>>> COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>>> COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>>> COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>>> COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>>> *from*(*select *customerNumber *as *sor_pty_id,
>>>> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
>>>> UDF
>>>> ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
>>>> UDF
>>>> fundCode *as *product_code,
>>>> businessType *as *business_type,
>>>> balance,
>>>> proctime
>>>> *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>>>> *C
>>>> *on **U*.sor_pty_id = *C*.cust_id
>>>> *group by *sor_pty_id,
>>>> entrust_date,
>>>> entrust_time,
>>>> product_code,
>>>> business_type,
>>>> COALESCE(C.cust_name, *'--'*),
>>>> COALESCE(C.open_comp_name, *'--'*),
>>>> COALESCE(C.open_comp_id, *'--'*),
>>>> COALESCE(C.org_name,*'--'*),
>>>> COALESCE(C.org_id,*'--'*),
>>>> COALESCE(C.comp_name, *'--'*),
>>>> COALESCE(C.comp_id,*'--'*),
>>>> COALESCE(C.mng_name,*'--'*),
>>>> COALESCE(C.mng_id,*'--'*),
>>>> COALESCE(C.is_tg,*'--'*),
>>>> COALESCE(C.cust_type,*'--'*),
>>>> COALESCE(C.avg_tot_aset_y365, 0.00),
>>>> COALESCE(C.avg_aset_create_y, 0.00)
>>>> 2020-03-01 17:22:06,504 ERROR
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled
>>>> exception.
>>>> org.apache.flink.util.FlinkRuntimeException:
>>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>>>> be compiled. This is a bug. Please file an issue.
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(
>>>> at
>>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(
>>>> at
>>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(
>>>> at
>>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(
>>>> at
>>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(
>>>> at
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(
>>>> at
>>>> java.util.concurrent.CompletableFuture$
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$
>>>> at
>>>> Caused by:
>>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>>>> be compiled. This is a bug. Please file an issue.
>>>> at
>>>> at
>>>> at
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(
>>>> ... 16 more
>>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>>>> program cannot be compiled. This is a bug. Please file an issue.
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(
>>>> at
>>>> at
>>>> at
>>>> at
>>>> at
>>>> ... 19 more
>>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
>>>> Column 30: Cannot determine simple type name "com"
>>>> at org.codehaus.janino.UnitCompiler.compileError(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(
>>>> at org.codehaus.janino.UnitCompiler.getType2(
>>>> at org.codehaus.janino.UnitCompiler.access$13800(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(
>>>> at org.codehaus.janino.Java$ReferenceType.accept(
>>>> at org.codehaus.janino.UnitCompiler$21.visitType(
>>>> at org.codehaus.janino.UnitCompiler$21.visitType(
>>>> at org.codehaus.janino.Java$ReferenceType.accept(
>>>> at org.codehaus.janino.UnitCompiler.getType(
>>>> at org.codehaus.janino.UnitCompiler.access$1300(
>>>> at org.codehaus.janino.UnitCompiler$24.getType(
>>>> at org.codehaus.janino.UnitCompiler.getType2(
>>>> at org.codehaus.janino.UnitCompiler.access$14300(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(
>>>> at org.codehaus.janino.Java$FieldAccess.accept(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(
>>>> at org.codehaus.janino.Java$Lvalue.accept(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(
>>>> at org.codehaus.janino.Java$Rvalue.accept(
>>>> at org.codehaus.janino.UnitCompiler.getType(
>>>> at org.codehaus.janino.UnitCompiler.getType2(
>>>> at org.codehaus.janino.UnitCompiler.access$14100(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(
>>>> at org.codehaus.janino.Java$AmbiguousName.accept(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(
>>>> at org.codehaus.janino.Java$Lvalue.accept(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(
>>>> at org.codehaus.janino.Java$Rvalue.accept(
>>>> at org.codehaus.janino.UnitCompiler.getType(
>>>> at org.codehaus.janino.UnitCompiler.findIMethod(
>>>> at org.codehaus.janino.UnitCompiler.compileGet2(
>>>> at org.codehaus.janino.UnitCompiler.access$9100(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(
>>>> at org.codehaus.janino.Java$MethodInvocation.accept(
>>>> at org.codehaus.janino.UnitCompiler.compileGet(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileGetValue(
>>>> at org.codehaus.janino.UnitCompiler.compile2(
>>>> at org.codehaus.janino.UnitCompiler.access$5900(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(
>>>> at org.codehaus.janino.Java$MethodInvocation.accept(
>>>> at org.codehaus.janino.UnitCompiler.compile(
>>>> at org.codehaus.janino.UnitCompiler.compile2(
>>>> at org.codehaus.janino.UnitCompiler.access$1800(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(
>>>> at org.codehaus.janino.Java$ExpressionStatement.accept(
>>>> at org.codehaus.janino.UnitCompiler.compile(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileStatements(
>>>> at org.codehaus.janino.UnitCompiler.compile(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(
>>>> at org.codehaus.janino.UnitCompiler.compile2(
>>>> at org.codehaus.janino.UnitCompiler.compile2(
>>>> at org.codehaus.janino.UnitCompiler.access$400(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(
>>>> at
>>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(
>>>> at
>>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(
>>>> at org.codehaus.janino.UnitCompiler.compile(
>>>> at org.codehaus.janino.UnitCompiler.compileUnit(
>>>> at org.codehaus.janino.SimpleCompiler.cook(
>>>> at
>>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(
>>>> at org.codehaus.janino.SimpleCompiler.cook(
>>>> at org.codehaus.janino.SimpleCompiler.cook(
>>>> at org.codehaus.commons.compiler.Cookable.cook(
>>>> at org.codehaus.commons.compiler.Cookable.cook(
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(
>>>> ... 25 more
