[ 
https://issues.apache.org/jira/browse/FLINK-34112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17952797#comment-17952797
 ] 

Juliusz Nadberezny commented on FLINK-34112:
--------------------------------------------

1.20 seems to be affected as well

> JavaCodeSplitter OOM
> --------------------
>
>                 Key: FLINK-34112
>                 URL: https://issues.apache.org/jira/browse/FLINK-34112
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Junning Liang
>            Priority: Critical
>
> I writed a sql that has many case when syntax in FLINK 1.17 release version. 
> But even if the client provides 8GB of memory, there is still an OOM 
> exception:
> {code:java}
> 16:38:51,975 ERROR org.apache.flink.client.didi.job.FlinkKubernetesJobClient  
>   [] - Failed to execute flink 
> job.org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: JavaCodeSplitter failed. This is a bug. Please file 
> an issue.    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.client.didi.job.FlinkKubernetesJobClient.internalSubmitJob(FlinkKubernetesJobClient.java:474)
>  ~[flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> org.apache.flink.client.didi.job.FlinkKubernetesJobClient.submitStreamSQL(FlinkKubernetesJobClient.java:184)
>  [flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.component.FlinkJobK8sClientComponent.startStreamSql(FlinkJobK8sClientComponent.java:107)
>  [flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartFlinkJob(FlinkK8sClientExecutor.java:230)
>  [flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartJob(FlinkK8sClientExecutor.java:130)
>  [flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.main(FlinkK8sClientExecutor.java:55)
>  [flink-client-executor-core-1.17.0-011_pre.jar:?]Caused by: 
> java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please 
> file an issue.    at 
> org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion.translateToPlanInternal(CommonExecUnion.java:61)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.Iterator.foreach(Iterator.scala:937) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.Iterator.foreach$(Iterator.scala:937) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.IterableLike.foreach(IterableLike.scala:70) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1873)
>  ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:891)
>  ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:2053)
>  ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018]    at 
> com.didichuxing.flink.executor.CommunitySqlExecutor.execute(CommunitySqlExecutor.java:34)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> com.didichuxing.flink.executor.FlinkSqlExecutor.execute(FlinkSqlExecutor.java:51)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> com.didichuxing.flink.FlinkSqlApp.main(FlinkSqlApp.java:35) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]    
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_252]    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_252]    at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_252]    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    ... 8 moreCaused by: 
> java.lang.OutOfMemoryError: Java heap space    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addAnyChild(ParserRuleContext.java:133)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addChild(ParserRuleContext.java:139)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.addContextToParseTree(Parser.java:617)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.enterRule(Parser.java:629)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.primary(JavaParser.java:7760) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:6954) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:7020) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.parExpression(JavaParser.java:6653)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5521) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]16:38:51,997 ERROR 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor 
> [] - start job 
> fail!org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: JavaCodeSplitter failed. This is a bug. Please file 
> an issue.    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.client.didi.job.FlinkKubernetesJobClient.internalSubmitJob(FlinkKubernetesJobClient.java:474)
>  ~[flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> org.apache.flink.client.didi.job.FlinkKubernetesJobClient.submitStreamSQL(FlinkKubernetesJobClient.java:184)
>  ~[flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.component.FlinkJobK8sClientComponent.startStreamSql(FlinkJobK8sClientComponent.java:107)
>  ~[flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartFlinkJob(FlinkK8sClientExecutor.java:230)
>  ~[flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartJob(FlinkK8sClientExecutor.java:130)
>  [flink-client-executor-core-1.17.0-011_pre.jar:?]    at 
> com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.main(FlinkK8sClientExecutor.java:55)
>  [flink-client-executor-core-1.17.0-011_pre.jar:?]Caused by: 
> java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please 
> file an issue.    at 
> org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion.translateToPlanInternal(CommonExecUnion.java:61)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.Iterator.foreach(Iterator.scala:937) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.Iterator.foreach$(Iterator.scala:937) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.IterableLike.foreach(IterableLike.scala:70) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
>  ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1873)
>  ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:891)
>  ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:2053)
>  ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018]    at 
> com.didichuxing.flink.executor.CommunitySqlExecutor.execute(CommunitySqlExecutor.java:34)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> com.didichuxing.flink.executor.FlinkSqlExecutor.execute(FlinkSqlExecutor.java:51)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> com.didichuxing.flink.FlinkSqlApp.main(FlinkSqlApp.java:35) 
> ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]    
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_252]    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_252]    at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_252]    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]    ... 8 moreCaused by: 
> java.lang.OutOfMemoryError: Java heap space    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addAnyChild(ParserRuleContext.java:133)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addChild(ParserRuleContext.java:139)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.addContextToParseTree(Parser.java:617)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.enterRule(Parser.java:629)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.primary(JavaParser.java:7760) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:6954) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:7020) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.parExpression(JavaParser.java:6653)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5521) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190)
>  ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]    at 
> org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) 
> ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] {code}
> the sql is follow:
> {code:java}
> add jar 'hdfs://xxxx/realtime-netcar-udf-lastest-v1.1.3-SNAPSHOT.jar';
> CREATE FUNCTION RemoveMapKeySort as 
> 'org.didichuxing.flink.udf.RemoveMapKey2ArraySort';set 
> `table.exec.resource.default-parallelism` = `1`;
> set `table.generated-code.max-length`=`7000`;
> set `table.generated-code.allow` = `true`;-- Source schema
> CREATE TABLE source_table (
> binlog_table                  string,
> binlog_time                   string,
> binlog_event                  string,
> order_id                      string,
> driver_id                     string,
> passenger_id                  string,
> channel                       string,
> product_id                    string,
> area                          string,
> to_area                       string,
> county                        string,
> order_status                  string,
> _status                       string,
> t_order_status                string,
> f_order_status                string,
> _birth_time                   string,
> cancelled_time                string,
> new_time                      string,
> assigned_time                 string,
> prepared_time                 string,
> completed_time                string,
> departure_time                string,
> begun_time                    string,
> consult_time                  string,
> finished_time                 string,
> estimate_time                 string,
> _modify_time                  string,
> start_dest_distance           string,
> driver_start_distance         string,
> distance                      string,
> pre_total_fee                 string,
> `type`                        string,
> combo_type                    string,
> complete_type                 string,
> airport_type                  string,
> carpool_type                  string,
> compound_type                 string,
> source_type                   string,
> driver_type                   string,
> capacity_level                string,
> level_type                    string,
> require_level                 string,
> route_type                    string,
> strive_car_level              string,
> extra_type                    string,
> extend_feature                string,
> is_special_price              string,
> carpool_price_type            string,
> estimate_id                   string, 
> new_lng                       string, 
> new_lat                       string,
> assigned_lng                  string, 
> assigned_lat                  string,
> prepared_lng                  string, 
> prepared_lat                  string,
> begun_lng                     string, 
> begun_lat                     string,
> finished_lng                  string,
> finished_lat                  string,
> dynamic_price                 string,
> travel_id                     string,
> passenger_count               string,
> combo_id                      string,
> starting_lng                  string,
> starting_lat                  string,
> dest_lng                      string,
> dest_lat                      string,
> cap_price                     string,
> spacious_car_alliance         string,
> json_extend_1                 string,
> p_access_key_id               string
> )
> WITH
> (
>     'connector.topic' = 'xxx',
>     'connector.startup-mode' = 'group-offsets',
>     'connector.type' = 'kafka',
>     'connector.properties.bootstrap.servers' = 'xxxx',
>     'connector.properties.group.id' = 'rxxx',
>     'connector.parallelism' = '1',
>     'format.type' = 'json'
> );-- Sink schema
> CREATE TABLE realtime_dwd_trip_trd_order_base (
> table_filter                    string,
> table_time                        string,
> table_event                        string,
> order_id                        bigint,
> driver_id                       bigint,
> passenger_id                    bigint,
> channel_id                        bigint,
> channel_name                     string,
> channel_level1                  string,
> channel_level2                  string,
> product_id                        bigint,
> sub_product_line                bigint,
> level_1_product                    bigint,
> level_2_product                    bigint,
> level_3_product                    bigint,
> call_city_id                    bigint,
> to_city_id                        bigint,
> call_city_name                    string,
> call_county_id                    bigint,
> call_county_name                string,
> region_name                     string,
> order_status                    bigint,
> call_time                        string,
> cancel_time                        string,
> answer_time                     string,
> arrive_time                     string,
> depart_time                        string,
> charge_time                        string,
> finish_time                        string,
> call_time_minute                string,
> cancel_time_minute              string,
> answer_time_minute              string,
> arrive_time_minute              string,
> depart_time_minute              string,
> charge_time_minute              string,
> finish_time_minute              string,
> est_dis                            bigint,
> est_arrive_dis                    bigint,
> est_price_amt                    double,
> combo_type                        bigint,
> complete_type                    bigint,
> airport_type                    bigint,
> carpool_type                    bigint,
> compound_type                    bigint,
> source_type                        bigint,
> driver_type                        bigint,
> capacity_level                    bigint,
> level_type                        bigint,
> require_level                    bigint,
> route_type                        bigint,
> strive_car_level                bigint,
> extra_type                        bigint,
> answer_dur                        bigint,
> est_dur                            bigint,
> arrive_dur                        bigint,
> cancel_dur                        bigint,
> driver_wait_dur                    bigint,
> charge_dur                        bigint,
> carpool_price_type              bigint,
> is_appt_flag                    bigint,
> is_agency_call                    bigint,
> is_anycar                        bigint,
> is_cross_city_flag                bigint,
> is_call_flag                    bigint,
> cancel_type                     bigint,
> is_cancel_flag                    bigint,
> is_grab_before_cannel_flag        bigint,
> is_grab_after_pas_cancel_flag    bigint,
> is_grab_after_dri_cancel_flag    bigint,
> is_grab_after_srvc_cancel_flag    bigint,
> is_reassigned_flag                bigint,
> is_reassign_flag                bigint,
> is_answer_flag                    bigint,
> is_arrive_flag                    bigint,
> is_begin_charge_flag            bigint,
> is_finish_flag                    bigint,
> is_openapi                        bigint,
> is_nirvana                        bigint,
> is_ontheway                        bigint,
> is_update_dest                    bigint,
> is_service_agency_call            bigint,
> is_serial_assign                bigint,
> is_prepay                        bigint,
> estimate_id                     string,
> estimate_id_anycar              string,
> rank_index                            bigint,
> is_anycar_sumtag                bigint, 
> call_lng                         string,
> call_lat                         string,
> answer_lng                    string,
> answer_lat                    string,
> arrive_lng                    string,
> arrive_lat                    string,
> begin_charge_lng                       string,
> begin_charge_lat                       string,
> finish_lng                    string,
> finish_lat                    string,
> event_lng                       string,
> event_lat                       string,
> actual_distance                 bigint,
> event_distance                  bigint,
> extend_feature_value             string,
> distance_category               int,
> is_special_price                int,
> is_guide_scene                  int,
> extend_feature                  string,
> dynamic_price                   int,
> travel_id                     bigint,
> passenger_num               bigint,
> combo_id                       bigint, 
> dest_lng                        string, 
> dest_lat                        string, 
> cap_price_amt                   string,
> spacious_car_alliance           int,
> pro_id                          int,
> pro_name                        string,
> json_extend_1                   string,
> access_key_id                   int              
> )
> WITH
> (
>     'connector' = 'print'
> );-- 非anycar部分
> create view source_non_anycar_table
> as
> select  binlog_table
>         ,binlog_time
>         ,binlog_event
>         ,order_id
>         ,driver_id
>         ,passenger_id
>         ,channel
>         ,source_table.product_id
>         ,area
>         ,to_area
>         ,county
>         ,order_status as order_status
>         ,_status
>         ,t_order_status
>         ,f_order_status
>         ,_birth_time
>         ,coalesce(cancelled_time,'1971-01-01 00:00:00') as cancelled_time
>         ,new_time
>         ,assigned_time
>         ,prepared_time
>         ,completed_time
>         ,departure_time
>         ,begun_time
>         ,consult_time
>         ,finished_time
>         ,estimate_time
>         ,_modify_time
>         ,cast(start_dest_distance as bigint) as start_dest_distance
>         ,cast(driver_start_distance as bigint) as driver_start_distance
>         ,cast(cast(distance as double)*1000 as bigint)  as distance
>         ,source_table.pre_total_fee
>         ,`type`
>         ,source_table.combo_type
>         ,complete_type
>         ,airport_type
>         ,carpool_type
>         ,compound_type
>         ,source_type
>         ,driver_type
>         ,capacity_level
>         ,level_type
>         ,source_table.require_level
>         ,route_type
>         ,strive_car_level
>         ,source_table.extra_type
>         ,RemoveMapKeySort(get_json_object(extend_feature, 
> '$.multi_require_product')) as extend_feature_value
>         ,is_special_price
>         ,carpool_price_type
>         ,estimate_id 
>         ,'0' as estimate_id_anycar  
>         ,9999 as rank_index
>         ,new_lng         
>         ,new_lat         
>         ,assigned_lng    
>         ,assigned_lat    
>         ,prepared_lng    
>         ,prepared_lat    
>         ,begun_lng       
>         ,begun_lat       
>         ,finished_lng    
>         ,finished_lat 
>         ,extend_feature   
>         ,dynamic_price
>         ,travel_id
>         ,passenger_count 
>         ,combo_id
>         ,starting_lng
>         ,starting_lat
>         ,dest_lng
>         ,dest_lat
>         ,cap_price
>         ,spacious_car_alliance
>         ,json_extend_1
>         ,p_access_key_id
>         ,case
>                 when ((coalesce(product_id,'-1') ='1' and 
> coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and 
> not (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in 
> ('1','4','2','3','5','10','17','7','6','37')) then '1'
>                 when coalesce(product_id,'-1')='1' and 
> coalesce(combo_type,'-1') = '308' then '308' 
>                 when coalesce(product_id,'-1') in ('3','4') and 
> coalesce(require_level,'-1')<>'900' and coalesce(combo_type,'-1')<>'314' or 
> coalesce(product_id,'-1') in ('287') or coalesce(cast(product_id as 
> bigint),-1) between 3000 and 3999 then '3' 
>                 when coalesce(product_id,'-1') in ('6') then '6' 
>                 when coalesce(product_id,'-1') in ('7','20') then product_id
>                 when coalesce(product_id,'-1') in ('3','4') and 
> coalesce(require_level,'-1')='900' then '99' 
>                 when (((coalesce(product_id,'-1') ='1' and 
> coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and 
> (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in 
> ('1','4','2','3','5','10','17','7','6','37'))) or coalesce(product_id,'-1') 
> in ('9','22') then '9' 
>                 when coalesce(product_id,'-1') in ('11','19','12','21') then 
> '11' 
>                 when coalesce(product_id,'-1') in ('26') then '999' 
>                 when (coalesce(product_id,'-1') in ('3','4') and 
> coalesce(combo_type,'-1')='314') or (coalesce(product_id,'-1') in 
> ('800','801')) then '314' 
>                 when product_id in ('38') then '38'
>                 when coalesce(cast(channel as bigint),-1) between 1000001 and 
> 2000000 and (coalesce(cast(product_id as bigint),-1) between 50 and 59 or 
> coalesce(cast(product_id as bigint),-1) between 70 and 79 or 
> coalesce(cast(product_id as bigint),-1) between 200 and 599) then '50' 
>                 when coalesce(cast(channel as bigint),-1) not between 1000001 
> and 2000000 and coalesce(product_id,'-1') in 
> ('51','52','53','54','59','71','73','74','75','76','79','77') then '50' 
>                 when coalesce(cast(product_id as bigint),-1) between 50 and 
> 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or 
> coalesce(cast(product_id as bigint),-1) between 200 and 599 then '51' 
>                 when coalesce(product_id,'-1') in ('700') then '700' 
>                 when coalesce(product_id,'-1') in ('81') then '81' 
>                 when coalesce(product_id,'-1') in ('83') then '327' 
>                 when coalesce(product_id,'-1') in ('15') then '15' 
>         end as category_id
> from    source_table
> where   not (order_status in ('0','6')
>         and   extend_feature is not null
>         and   extend_feature like '%multi_require_product%')
>         and   not (binlog_table='d_order_status' and order_status='5' and 
> f_order_status is null and completed_time>'2020-01-01 00:00:00') 
> ;-- anycar部分
> create view source_anycar_table
> as
> select  binlog_table
>         ,binlog_time
>         ,binlog_event
>         ,order_id
>         ,driver_id
>         ,passenger_id
>         ,channel
>         ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'product_id', -1) as string) as product_id
>         ,area
>         ,to_area
>         ,county
>         ,order_status as order_status
>         ,_status
>         ,t_order_status
>         ,f_order_status
>         ,_birth_time
>         ,coalesce(cancelled_time,'1971-01-01 00:00:00') as cancelled_time
>         ,new_time
>         ,assigned_time
>         ,prepared_time
>         ,completed_time
>         ,departure_time
>         ,begun_time
>         ,consult_time
>         ,finished_time
>         ,estimate_time
>         ,_modify_time
>         ,cast(start_dest_distance as bigint) as start_dest_distance
>         ,cast(driver_start_distance as bigint) as driver_start_distance
>         ,cast(cast(distance as double)*1000 as bigint) as distance
>         ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'pre_total_fee', 0) as string) as pre_total_fee
>         ,`type`
>         ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'combo_type', -1) as string) as combo_type
>         ,complete_type
>         ,airport_type
>         ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'carpool_type', -1) as string) as carpool_type
>         ,compound_type
>         ,source_type
>         ,driver_type
>         ,capacity_level
>         ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'level_type', -1) as string) as level_type
>         ,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'require_level', '-1') as require_level
>         ,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'route_type', 
> '-1') as route_type
>         ,strive_car_level
>         ,cast(GET_LONG_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'extra_type', -1) as string) as extra_type
>         ,RemoveMapKeySort(get_json_object(extend_feature, 
> '$.multi_require_product')) as extend_feature_value
>         ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'is_special_price', -1) as string) as is_special_price 
>         ,carpool_price_type
>         ,estimate_id  
>         ,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'estimate_id', '1') as estimate_id_anycar 
>         ,GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'rank', -1) as 
> rank_index
>         ,new_lng         
>         ,new_lat         
>         ,assigned_lng    
>         ,assigned_lat    
>         ,prepared_lng    
>         ,prepared_lat    
>         ,begun_lng       
>         ,begun_lat       
>         ,finished_lng    
>         ,finished_lat 
>         ,extend_feature   
>         ,dynamic_price
>         ,travel_id
>         ,passenger_count
>         ,combo_id
>         ,starting_lng
>         ,starting_lat
>         ,dest_lng
>         ,dest_lat
>         ,cap_price
>         ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 
> 'spacious_car_alliance', -1) as string) as spacious_car_alliance 
>         ,json_extend_1
>         ,p_access_key_id
>         ,case
>                 when ((coalesce(product_id,'-1') ='1' and 
> coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and 
> not (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in 
> ('1','4','2','3','5','10','17','7','6','37')) then '1' 
>                 when coalesce(product_id,'-1')='1' and 
> coalesce(combo_type,'-1') = '308' then '308' 
>                 when coalesce(product_id,'-1') in ('3','4') and 
> coalesce(require_level,'-1')<>'900' and coalesce(combo_type,'-1')<>'314' or 
> coalesce(product_id,'-1') in ('287') or coalesce(cast(product_id as 
> bigint),-1) between 3000 and 3999 then '3' 
>                 when coalesce(product_id,'-1') in ('6') then '6' 
>                 when coalesce(product_id,'-1') in ('7','20') then product_id 
>                 when coalesce(product_id,'-1') in ('3','4') and 
> coalesce(require_level,'-1')='900' then '99' 
>                 when (((coalesce(product_id,'-1') ='1' and 
> coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and 
> (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in 
> ('1','4','2','3','5','10','17','7','6','37'))) or coalesce(product_id,'-1') 
> in ('9','22') then '9' 
>                 when coalesce(product_id,'-1') in ('11','19','12','21') then 
> '11' 
>                 when coalesce(product_id,'-1') in ('26') then '999' 
>                 when (coalesce(product_id,'-1') in ('3','4') and 
> coalesce(combo_type,'-1')='314') or (coalesce(product_id,'-1') in 
> ('800','801')) then '314' 
>                 when product_id in ('38') then '38' 
>                 when coalesce(cast(channel as bigint),-1) between 1000001 and 
> 2000000 and (coalesce(cast(product_id as bigint),-1) between 50 and 59 or 
> coalesce(cast(product_id as bigint),-1) between 70 and 79 or 
> coalesce(cast(product_id as bigint),-1) between 200 and 599) then '50' 
>                 when coalesce(cast(channel as bigint),-1) not between 1000001 
> and 2000000 and coalesce(product_id,'-1') in 
> ('51','52','53','54','59','71','73','74','75','76','79','77') then '50' 
>                 when coalesce(cast(product_id as bigint),-1) between 50 and 
> 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or 
> coalesce(cast(product_id as bigint),-1) between 200 and 599 then '51' 
>                 when coalesce(product_id,'-1') in ('700') then '700' 
>                 when coalesce(product_id,'-1') in ('81') then '81' 
>                 when coalesce(product_id,'-1') in ('83') then '327' 
>                 when coalesce(product_id,'-1') in ('15') then '15' 
>         end as category_id
> from    source_table
>         ,LATERAL 
> TABLE(JSON_ARRAY_TO_STR(RemoveMapKeySort(get_json_object(extend_feature, 
> '$.multi_require_product')))) as T(any_call_info)
> where   order_status in ('0','6')
>   and   extend_feature is not null
>   and   extend_feature like '%multi_require_product%'
>   and   not (binlog_table='d_order_status' and order_status='5' and 
> f_order_status is null and completed_time>'2020-01-01 00:00:00') 
> ;
> -- Union all
> create view view_merge
> as
> select  *
> from    source_non_anycar_table
> union all   
> select  *
> from    source_anycar_table
> ;
> -- View
> create view view_01
> as
> select  binlog_table
>         ,binlog_time
>         ,binlog_event
>         ,order_id
>         ,driver_id
>         ,passenger_id
>         ,channel        ,'未知' as channel_name
>         ,'未知' as channel_level1
>         ,'未知' as channel_level2
>         ,product_id
>         ,case   when level_type = '1' then '30'    
>                 when product_id = '3' and capacity_level = '11000' then '81'  
>   
>                 when spacious_car_alliance = '1' then '90' 
>                 else category_id
>          end as sub_product_line    
>         ,area
>         ,to_area
>         ,'未知' as city_name
>         ,county
>         ,'未知' as county_name
>         ,'未知' as region
>         ,order_status
>         ,_birth_time
>         ,case   when order_status = '6' and source_type <> '2' then 
> _modify_time    
>                 when (source_type = '2' and order_status = '6') then 
> _modify_time    
>                 when order_status = '7' and cancelled_time = '1971-01-01 
> 00:00:00' then _modify_time    
>                 when ((complete_type not in ('7', '13') and order_status = 
> '12') or (source_type = '2' and order_status = '0') or order_status = '9') 
> and (order_status = '12' and complete_type = '14') then _modify_time    
>                 when order_status = '11' then _modify_time    
>                 when complete_type = '13' and order_status = '12' then 
> _modify_time    
>                 else cancelled_time
>          end cancel_time
>         ,assigned_time
>         ,prepared_time
>         ,departure_time
>         ,begun_time
>         ,finished_time
>         ,start_dest_distance
>         ,driver_start_distance
>         ,pre_total_fee
>         ,combo_type
>         ,complete_type
>         ,airport_type
>         ,carpool_type
>         ,compound_type
>         ,source_type
>         ,driver_type
>         ,capacity_level
>         ,level_type
>         ,require_level
>         ,route_type
>         ,strive_car_level
>         ,extra_type
>         ,case when assigned_time is not null and _birth_time is not null and 
> assigned_time > _birth_time then cast(DATE_DIFF(_birth_time,assigned_time) as 
> string) else '0' end as answer_dur
>         ,cast(estimate_time as bigint)*60 as est_dur
>         ,case when assigned_time is not null and prepared_time is not null 
> and prepared_time > assigned_time then 
> cast(DATE_DIFF(assigned_time,prepared_time) as string) else '0' end as 
> arrive_dur
>         ,case   when cancelled_time is not null and assigned_time is not null 
> and cancelled_time > assigned_time then 
> cast(DATE_DIFF(assigned_time,cancelled_time) as string)
>                 when cancelled_time is not null and _birth_time is not null 
> and cancelled_time > _birth_time then cast(DATE_DIFF(_birth_time 
> ,cancelled_time) as string)
>                 when f_order_status is not null and assigned_time is not null 
> and order_status = '7' and _modify_time > assigned_time then 
> cast(DATE_DIFF(assigned_time,_modify_time) as string)
>                 when f_order_status is not null and _birth_time is not null 
> and order_status = '7' and _modify_time > _birth_time then 
> cast(DATE_DIFF(_birth_time,_modify_time) as string)
>                 else '0'
>          end as cancel_dur
>         ,case when begun_time is not null and prepared_time is not null and 
> begun_time > prepared_time then cast(DATE_DIFF(prepared_time,begun_time) as 
> string) else '0' end as driver_wait_dur
>         ,case when finished_time is not null and begun_time is not null and 
> finished_time > begun_time then cast(DATE_DIFF(begun_time,finished_time) as 
> string) else '0' end as charge_dur
>         ,cast(`type` as int) as is_appt_flag
>         ,case when extra_type is not null and BITWISE_AND(cast(extra_type as 
> bigint), 131072) > 0 then  '1' else '0' end as is_agency_call
>         ,case when compound_type is not null and 
> BITWISE_AND(cast(compound_type as bigint),8192) > 0 then '1' else '0' end as 
> is_anycar
>         ,case when area is not null and to_area is not null and area <> 
> to_area then '1' else '0' end as is_cross_city_flag
>         -- ,case when _birth_time is not null and substring(_birth_time,1,10) 
> > '2020-01-01 00:00:00' and order_status='0' and binlog_table='d_order_base' 
> and (driver_id = '0' or driver_id is null) and 
> substring(_birth_time,1,16)=substring(_modify_time,1,16) and binlog_event='i' 
> then '1' else '0' end as is_call_flag
>         ,case when _birth_time is not null and substring(_birth_time,1,10) > 
> '2020-01-01 00:00:00' and order_status='0' and (driver_id = '0' or driver_id 
> is null) and substring(_birth_time,1,10)=substring(_modify_time,1,10) then 
> '1' else '0' end as is_call_flag
>         ,case   when f_order_status is not null and source_type is not null 
> and cancelled_time is not null and _modify_time is not null and order_status 
> is not null and substring((case when order_status = '7' then cancelled_time 
> else _modify_time end),1,10) > '2020-01-01 00:00:00' and order_status = '6' 
> and source_type <> '2' and binlog_table='d_order_status' then '1'
>                 when f_order_status is not null and source_type is not null 
> and cancelled_time is not null and _modify_time is not null and order_status 
> is not null and substring((case when order_status = '7' then cancelled_time 
> else _modify_time end),1,10) > '2020-01-01 00:00:00' and ((source_type = '2' 
> and order_status in ('6', '7')) or (source_type <> '2' and order_status = 
> '7')) and binlog_table='d_order_status' then '2'
>                 when f_order_status is not null and source_type is not null 
> and cancelled_time is not null and complete_type is not null and _modify_time 
> is not null and order_status is not null and substring((case when 
> order_status = '7' then cancelled_time else _modify_time end),1,10) > 
> '2020-01-01 00:00:00' and ((complete_type not in ('7', '13') and order_status 
> = '12') or (source_type = '2' and order_status = '0') or order_status = '9') 
> and (order_status <> '12' or complete_type <> '14') and 
> binlog_table='d_order_status' then '3'
>                 when f_order_status is not null and source_type is not null 
> and cancelled_time is not null and _modify_time is not null and order_status 
> is not null and substring((case when order_status = '7' then cancelled_time 
> else _modify_time end),1,10) > '2020-01-01 00:00:00' and order_status = '11' 
> and binlog_table='d_order_status' then '4'
>                 when f_order_status is not null and source_type is not null 
> and cancelled_time is not null and complete_type is not null and _modify_time 
> is not null and order_status is not null and substring((case when 
> order_status = '7' then cancelled_time else _modify_time end),1,10) > 
> '2020-01-01 00:00:00' and complete_type = '13' and order_status = '12' and 
> binlog_table='d_order_status' then '5'
>                 else '0'
>          end as cancel_type
>         ,case when f_order_status is not null and complete_type is not null 
> and substring((case when order_status = '7' then cancelled_time else 
> _modify_time end),1,10) > '2020-01-01 00:00:00' and complete_type = '7' and 
> binlog_table='d_order_status' then '1' else '0' end as is_reassigned_flag
>         ,case when source_type is not null and source_type = '2' and 
> binlog_table='d_order_status' then '1' else '0' end as is_reassign_flag
>         ,case when assigned_time is not null and 
> substring(assigned_time,1,10) > '2020-01-01 00:00:00' and order_status='1' 
> and f_order_status is not null and binlog_table='d_order_status' and 
> driver_id is not null and cast(driver_id as bigint) > 0 then '1' else '0' end 
> as is_answer_flag
>         ,case when prepared_time is not null and 
> substring(prepared_time,1,10) > '2020-01-01 00:00:00' and order_status='2' 
> and f_order_status is not null and binlog_table='d_order_status' and 
> driver_id is not null and cast(driver_id as bigint) > 0 then '1' else '0' end 
> as is_arrive_flag
>         ,case when begun_time is not null and substring(begun_time,1,10) > 
> '2020-01-01 00:00:00' and order_status='4' and binlog_table='d_order_status' 
> and f_order_status is not null and driver_id is not null and cast(driver_id 
> as bigint) > 0 then '1' else '0' end as is_begin_charge_flag
>         ,case when finished_time is not null and 
> substring(finished_time,1,10) > '2020-01-01 00:00:00' and order_status = '5' 
> and f_order_status is not null and binlog_table='d_order_status' then '1' 
> else '0' end as is_finish_flag
>         ,case when passenger_id is not null and passenger_id = 
> '2624535533694' or (channel is not null and cast(channel as bigint) between 
> 27010 and 27019) then '1' else '0' end as is_openapi
>         ,case when product_id is not null and capacity_level is not null and 
> product_id in ('3', '4') and capacity_level = '2000' then '1' else '0' end as 
> is_nirvana
>         ,case when extra_type is not null and BITWISE_AND(cast(extra_type as 
> bigint), 33554432) > 0 then '1' else '0' end as is_ontheway
>         ,case when extra_type is not null and BITWISE_AND(cast(extra_type as 
> bigint), 8796093022208) > 0 then '1' else '0' end as is_update_dest
>         ,case when extra_type is not null and BITWISE_AND(cast(extra_type as 
> bigint), 137438953472) > 0 then '1' else '0' end as is_service_agency_call
>         ,case when extra_type is not null and BITWISE_AND(cast(extra_type as 
> bigint), 8388608) > 0 then '1' else '0' end as is_serial_assign
>         ,case when extra_type is not null and BITWISE_AND(cast(extra_type as 
> bigint), 8589934592) > 0 then '1' else '0' end as is_prepay
>         ,is_special_price
>         ,carpool_price_type
>         ,estimate_id
>         ,estimate_id_anycar
>         ,rank_index
>         ,starting_lng
>         ,starting_lat
>         ,assigned_lng 
>         ,assigned_lat 
>         ,prepared_lng 
>         ,prepared_lat 
>         ,begun_lng    
>         ,begun_lat    
>         ,finished_lng 
>         ,finished_lat 
>         ,distance
>         ,extend_feature_value
>         ,case 
>                 when distance<3*1000 then 0 
>                 when distance>=3*1000 and distance<6*1000 then 3
>                 when distance>=6*1000 and distance<10*1000 then 6
>                 when distance>=10*1000 and distance<15*1000 then 10
>                 when distance>=15*1000 and distance<20*1000 then 15
>                 when distance>=20*1000 then 20 
>         end as  real_distance_category
>         ,case 
>                 when start_dest_distance<3*1000 then 0 
>                 when start_dest_distance>=3*1000 and 
> start_dest_distance<6*1000 then 3
>                 when start_dest_distance>=6*1000 and 
> start_dest_distance<10*1000 then 6
>                 when start_dest_distance>=10*1000 and 
> start_dest_distance<15*1000 then 10
>                 when start_dest_distance>=15*1000 and 
> start_dest_distance<20*1000 then 15
>                 when start_dest_distance>=20*1000 then 20 
>         end as  estimate_distance_category
>         ,extend_feature
>         ,case when extra_type is not null and BITWISE_AND(cast(extra_type as 
> bigint), 268435456) > 0 then '1' else '0' end as is_guide_scene
>         ,dynamic_price
>         ,travel_id
>         ,passenger_count
>         ,combo_id
>         ,dest_lng
>         ,dest_lat
>         ,cap_price
>         ,spacious_car_alliance
>         ,'1' as pro_id
>         ,'未知' as pro_name
>         ,json_extend_1
>         ,p_access_key_id as access_key_id
> from    view_merge
> ;
> -- insert into order_base_test
> -- select 
> -- order_id,
> -- driver_id,
> -- passenger_id,
> -- product_id,
> -- sub_product_line
> -- from view_01;--Sink
> insert into realtime_dwd_trip_trd_order_base
> select  cast(binlog_table as string) as table_filter
>         ,cast(binlog_time as string) as table_time
>         ,cast(binlog_event as string) as table_event
>         ,cast(order_id as bigint) as order_id
>         ,cast(driver_id as bigint) as driver_id
>         ,cast(passenger_id as bigint) as passenger_id
>         ,cast(channel as bigint) as channel_id
>         ,cast(channel_name as string) as channel_name
>         ,cast(channel_level1 as string) as channel_level1
>         ,cast(channel_level2 as string) as channel_level2
>         ,cast(product_id as bigint) as product_id
>         ,cast(sub_product_line as bigint) as sub_product_line
>         ,case
>                 when sub_product_line in ('1', '3', '6', '7', '9', '20', 
> '99', '314', '30', '81', '700', '90','327') or (sub_product_line in 
> ('11','12') and require_level='2200') then 110000
>                 when sub_product_line in ('11','12') and 
> require_level<>'2200' then 120000
>                 when sub_product_line in ('308') then 130000
>                 when sub_product_line in ('38') then 140000
>                 when sub_product_line in ('50','51') then 190000
>                 when sub_product_line = '15' then 150000 
>                 else 990000
>         end as level_1_product
>         ,case
>                 when sub_product_line in ('3','7','30','81','90')  and 
> combo_type not in ('4','302')     then 110100
>                 when sub_product_line in ('3','7') and combo_type = '4'       
>              then 110200
>                 when sub_product_line in ('3','7') and combo_type = '302'     
>              then 110300
>                 when sub_product_line  in ('20','99')                         
>            then 110400
>                 when sub_product_line  in ('1','6')                           
>            then 110500
>                 when sub_product_line  in ('9')                               
>          then 110600    
>                 when sub_product_line in ('11','12') and 
> require_level<>'2200'             then 120100 
>                 when sub_product_line  in ('308')                             
>          then 130100
>                 when sub_product_line  in ('38')                              
>          then 140200
>                 when sub_product_line  in ('700')                             
>          then 110800
>                 when sub_product_line  in ('314')                             
>          then 110900 
>                 when sub_product_line in ('11','12') and require_level='2200' 
>              then 111000 
>                 when (cast(channel as bigint)>=1000001 and cast(channel as 
> bigint)<=2000000 and sub_product_line = '50') or ((cast(channel as 
> bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line in 
> ('50') and coalesce(product_id,'-1') <> '77') or sub_product_line = '51'  
> then 190200 
>                 when sub_product_line in ('327') then 111200 
>                 when sub_product_line = '15' then 150100 
>                 when (cast(channel as bigint)<1000001 or cast(channel as 
> bigint) >2000000) and sub_product_line = '50' and coalesce(product_id,'-1') = 
> '77' then 190100 
>                 else 999900
>         end as  level_2_product
>         ,case   when sub_product_line = '11' and require_level = '2000' and 
> level_type = '5' then 120105 
>                 when sub_product_line = '11' and require_level = '1100' and 
> level_type = '5' then 120106 
>                 when sub_product_line = '11' and require_level = '2200' and 
> level_type = '7' then 111002 
>          else 
>                 case
>                         when sub_product_line in ('3','7') and combo_type not 
> in ('4','302') then 110101 
>                         when sub_product_line='30' then 110102 
>                         when sub_product_line='314' then 110103 
>                         when sub_product_line in ('3','7') and combo_type='4' 
> and carpool_type='4' then 110201 
>                         when sub_product_line in ('3','7') and combo_type='4' 
> and carpool_type='5' then 110202 
>                         when sub_product_line in ('3','7') and 
> require_level='600' and combo_type='4' and carpool_type in ('1','2') then 
> 110203 
>                         when sub_product_line in ('3','7') and 
> require_level='610' and combo_type='4' and carpool_type='2' then 110204 
>                         when sub_product_line in ('3','7') and 
> require_level='600' and combo_type='4' and carpool_type in ('10') then 110205 
>                         when sub_product_line in ('3','7') and require_level 
> = '4' then 110299 
>                         when sub_product_line in ('3','7') and combo_type = 
> '302' and carpool_type = '6' then 110303 
>                         when sub_product_line in ('3','7') and combo_type = 
> '302' and carpool_type = '3' and not (coalesce(product_id,'-1') in ('287') or 
> (coalesce(product_id,'-1') not in ('3013') and cast(coalesce(product_id,'-1') 
> as bigint) between 3000 and 3999)) then 110302 
>                         when sub_product_line in ('3','7') and combo_type = 
> '302' and carpool_type not in ('3','6','8') then 110304 
>                         when sub_product_line in ('3','7') and combo_type = 
> '302' and carpool_type = '3' then 110305 
>                         when sub_product_line in ('3','7') and combo_type = 
> '302' and carpool_type = '8' then 110306 
>                         when sub_product_line in ('20','99') then 110401 
>                         when sub_product_line in ('1','6') and 
> require_level='100' then 110501 
>                         when sub_product_line in ('1','6') and 
> require_level='400' then 110502 
>                         when sub_product_line in ('1','6') and 
> require_level='3300' then 110503 
>                         when sub_product_line in ('1','6') and 
> require_level='200' then 110504 
>                         when sub_product_line in ('1','6') then 110599 
>                         when sub_product_line in ('9') then 110601 
>                         when (cast(channel as bigint)<1000001 or cast(channel 
> as bigint) >2000000) and sub_product_line in ('50') and 
> coalesce(product_id,'-1') in ('52','53','54','59') then 110721  
>                         when sub_product_line= '51' or ((cast(channel as 
> bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line in 
> ('50') and coalesce(product_id,'-1') not in ('52','53','54','59','77')) then 
> 110722 
>                         when sub_product_line = '11' and require_level='2000' 
> then 120101  
>                         when sub_product_line = '11' and require_level = 
> '1100' and combo_type = '4' and carpool_type = '2' then 120102 
>                         when sub_product_line = '11' and 
> coalesce(cast(is_special_price as bigint), -1) = 1 then 120103 
>                         when sub_product_line = '11' and require_level 
> ='1100' then 120104 
>                         when sub_product_line = '12' then 120201 
>                         when sub_product_line in ('308') then 130101 
>                         when sub_product_line in ('38') then 140201 
>                         when sub_product_line in ('81') then 110104 
>                         when sub_product_line in ('90') then 110105 
>                         when sub_product_line in ('700') then 110801 
>                         when (cast(channel as bigint)>=1000001 and 
> cast(channel as bigint)<=2000000 and sub_product_line = '50') then 190201 
>                         when sub_product_line in ('11','12') and 
> require_level='2200' and level_type='0' then 111001 
>                         when sub_product_line in ('11','12') and 
> require_level='2200' and level_type='7' then 111002 
>                         when sub_product_line in ('327') then 111201 
>                         when sub_product_line = '15' then 150101 
>                         when (cast(channel as bigint)<1000001 or cast(channel 
> as bigint) >2000000) and sub_product_line = '50' and 
> coalesce(product_id,'-1')='77' then 190101 
>                         else 999999 
>                 end
>          end as level_3_product
>         ,cast(area as bigint) as call_city_id
>         ,cast(to_area as bigint) as to_city_id
>         ,cast(city_name as string) as call_city_name
>         ,cast(county as bigint) as call_county_id
>         ,cast(county_name as string) as call_county_name
>         ,cast(region as string) as region_name
>         ,cast(order_status as bigint) as order_status
>         ,cast(_birth_time as string) as call_time
>         ,cast(cancel_time as string) as cancel_time
>         ,cast(assigned_time as string) as answer_time
>         ,cast(prepared_time as string) as arrive_time
>         ,cast(departure_time as string) as depart_time
>         ,cast(begun_time as string) as charge_time
>         ,cast(finished_time as string) as finish_time
>         ,cast(cast(substring(cast(TIMESTAMP_TO_MS(_birth_time, 'yyyy-MM-dd 
> HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as call_time_minute
>         ,cast(cast(substring(cast(TIMESTAMP_TO_MS(cancel_time, 'yyyy-MM-dd 
> HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as cancel_time_minute
>         ,cast(cast(substring(cast(TIMESTAMP_TO_MS(assigned_time, 'yyyy-MM-dd 
> HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as answer_time_minute
>         ,cast(cast(substring(cast(TIMESTAMP_TO_MS(prepared_time, 'yyyy-MM-dd 
> HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as arrive_time_minute
>         ,cast(cast(substring(cast(TIMESTAMP_TO_MS(departure_time, 'yyyy-MM-dd 
> HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as depart_time_minute
>         ,cast(cast(substring(cast(TIMESTAMP_TO_MS(begun_time, 'yyyy-MM-dd 
> HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as charge_time_minute
>         ,cast(cast(substring(cast(TIMESTAMP_TO_MS(finished_time, 'yyyy-MM-dd 
> HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as finish_time_minute
>         ,cast(start_dest_distance as bigint) as est_dis
>         ,cast(driver_start_distance as bigint) as est_arrive_dis
>         ,cast(pre_total_fee as double) as est_price_amt
>         ,cast(combo_type as bigint) as combo_type
>         ,cast(complete_type as bigint) as complete_type
>         ,cast(airport_type as bigint) as airport_type
>         ,cast(carpool_type as bigint) as carpool_type
>         ,cast(compound_type as bigint) as compound_type
>         ,cast(source_type as bigint) as source_type
>         ,cast(driver_type as bigint) as driver_type
>         ,cast(capacity_level as bigint) as capacity_level
>         ,cast(level_type as bigint) as level_type
>         ,cast(require_level as bigint) as require_level
>         ,cast(route_type as bigint) as route_type
>         ,cast(strive_car_level as bigint) as strive_car_level
>         ,cast(extra_type as bigint) as extra_type
>         ,cast(answer_dur as bigint) as answer_dur
>         ,cast(est_dur as bigint) as est_dur
>         ,cast(arrive_dur as bigint) as arrive_dur
>         ,cast(cancel_dur as bigint) as cancel_dur
>         ,cast(driver_wait_dur as bigint) as driver_wait_dur
>         ,cast(charge_dur as bigint) as charge_dur
>         ,cast(carpool_price_type as bigint) as carpool_price_type
>         ,cast(is_appt_flag as bigint) as is_appt_flag    
>         ,cast(is_agency_call as bigint) as is_agency_call
>         ,cast(is_anycar as bigint) as is_anycar
>         ,cast(is_cross_city_flag as bigint) as is_cross_city_flag
>         ,cast(is_call_flag as bigint) as is_call_flag
>         ,cast(cancel_type as bigint) as cancel_type
>         ,cast(case when cancel_type <> '0' then 1 else 0 end as bigint) as 
> is_cancel_flag    
>         ,cast(case when cancel_type = '1' then 1 else 0 end as bigint) as 
> is_grab_before_cannel_flag
>         ,cast(case when order_status<>'6' and cancel_type = '2' then 1 else 0 
> end as bigint) as is_grab_after_pas_cancel_flag    
>         ,cast(case when order_status<>'6' and cancel_type = '3' then 1 else 0 
> end as bigint) as is_grab_after_dri_cancel_flag    
>         ,cast(case when order_status<>'6' and cancel_type = '4' then 1 else 0 
> end as bigint) as is_grab_after_srvc_cancel_flag    
>         ,cast(is_reassigned_flag as bigint) as is_reassigned_flag
>         ,cast(is_reassign_flag as bigint) as is_reassign_flag
>         ,cast(is_answer_flag as bigint) as is_answer_flag
>         ,cast(is_arrive_flag as bigint) as is_arrive_flag
>         ,cast(is_begin_charge_flag as bigint) as is_begin_charge_flag
>         ,cast(is_finish_flag as bigint) as is_finish_flag
>         ,cast(is_openapi as bigint) as is_openapi
>         ,cast(is_nirvana as bigint) as is_nirvana
>         ,cast(is_ontheway as bigint) as is_ontheway
>         ,cast(is_update_dest as bigint) as is_update_dest
>         ,cast(is_service_agency_call as bigint) as is_service_agency_call
>         ,cast(is_serial_assign as bigint) as is_serial_assign
>         ,cast(is_prepay as bigint) as is_prepay
>         ,estimate_id
>         ,estimate_id_anycar
>         ,rank_index
>         ,case when (rank_index = 0 or rank_index = 9999) then 1 else 0 end 
> is_anycar_sumtag
>         ,starting_lng as call_lng
>         ,starting_lat as call_lat
>         ,assigned_lng as answer_lng
>         ,assigned_lat as answer_lat
>         ,prepared_lng as arrive_lng
>         ,prepared_lat as arrive_lat
>         ,begun_lng as begin_charge_lng
>         ,begun_lat as begin_charge_lat
>         ,finished_lng as finish_lng
>         ,finished_lat as finish_lat
>         ,case when is_call_flag='1' then starting_lng
>               when is_answer_flag='1' then assigned_lng
>               when is_arrive_flag='1' then prepared_lng
>               when is_begin_charge_flag='1' then begun_lng
>               when is_finish_flag='1' then finished_lng
>               end as event_lng
>         ,case when is_call_flag='1' then starting_lat
>               when is_answer_flag='1' then assigned_lat
>               when is_arrive_flag='1' then prepared_lat
>               when is_begin_charge_flag='1' then begun_lat
>               when is_finish_flag='1' then finished_lat
>               end as event_lat
>         ,distance as actual_distance
>         ,case when is_call_flag='1' then start_dest_distance
>               when is_answer_flag='1' then driver_start_distance
>               when is_finish_flag='1' then distance
>               end as event_distance 
>         ,'' as extend_feature_value   
>         ,case when is_finish_flag = '1' then real_distance_category
>               else  estimate_distance_category 
>         end distance_category
>         ,cast(is_special_price as int) as is_special_price
>         ,cast(is_guide_scene as int) as is_guide_scene
>         ,extend_feature
>         ,cast(dynamic_price as int) as dynamic_price
>         ,cast(travel_id as bigint) as travel_id
>         ,cast(passenger_count as bigint) as passenger_num
>         ,cast(combo_id as bigint) as combo_id
>         ,dest_lng
>         ,dest_lat
>         ,cap_price as cap_price_amt
>         ,cast(spacious_car_alliance as int) as spacious_car_alliance
>         ,cast(pro_id as int) as pro_id
>         ,pro_name
>         ,json_extend_1
>         ,cast(access_key_id as int) as access_key_id
> from    view_01
> where channel not in ('1', '20001','1500001','1510001', '1010000001')
> ;{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to