[ https://issues.apache.org/jira/browse/FLINK-34112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17952797#comment-17952797 ]
Juliusz Nadberezny commented on FLINK-34112: -------------------------------------------- 1.20 seems to be affected as well > JavaCodeSplitter OOM > -------------------- > > Key: FLINK-34112 > URL: https://issues.apache.org/jira/browse/FLINK-34112 > Project: Flink > Issue Type: Bug > Reporter: Junning Liang > Priority: Critical > > I writed a sql that has many case when syntax in FLINK 1.17 release version. > But even if the client provides 8GB of memory, there is still an OOM > exception: > {code:java} > 16:38:51,975 ERROR org.apache.flink.client.didi.job.FlinkKubernetesJobClient > [] - Failed to execute flink > job.org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: JavaCodeSplitter failed. This is a bug. Please file > an issue. at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.client.didi.job.FlinkKubernetesJobClient.internalSubmitJob(FlinkKubernetesJobClient.java:474) > ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at > org.apache.flink.client.didi.job.FlinkKubernetesJobClient.submitStreamSQL(FlinkKubernetesJobClient.java:184) > [flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.component.FlinkJobK8sClientComponent.startStreamSql(FlinkJobK8sClientComponent.java:107) > [flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartFlinkJob(FlinkK8sClientExecutor.java:230) > [flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartJob(FlinkK8sClientExecutor.java:130) > [flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.main(FlinkK8sClientExecutor.java:55) > [flink-client-executor-core-1.17.0-011_pre.jar:?]Caused by: > java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please > file an issue. at > org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion.translateToPlanInternal(CommonExecUnion.java:61) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.Iterator.foreach(Iterator.scala:937) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.Iterator.foreach$(Iterator.scala:937) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.IterableLike.foreach(IterableLike.scala:70) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.IterableLike.foreach$(IterableLike.scala:69) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.TraversableLike.map(TraversableLike.scala:233) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1873) > ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:891) > ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:2053) > ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at > com.didichuxing.flink.executor.CommunitySqlExecutor.execute(CommunitySqlExecutor.java:34) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > com.didichuxing.flink.executor.FlinkSqlExecutor.execute(FlinkSqlExecutor.java:51) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > com.didichuxing.flink.FlinkSqlApp.main(FlinkSqlApp.java:35) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_252] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_252] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_252] at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] ... 8 moreCaused by: > java.lang.OutOfMemoryError: Java heap space at > org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addAnyChild(ParserRuleContext.java:133) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addChild(ParserRuleContext.java:139) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.addContextToParseTree(Parser.java:617) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.enterRule(Parser.java:629) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.primary(JavaParser.java:7760) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:6954) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:7020) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.parExpression(JavaParser.java:6653) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5521) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]16:38:51,997 ERROR > com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor > [] - start job > fail!org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: JavaCodeSplitter failed. This is a bug. Please file > an issue. at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.client.didi.job.FlinkKubernetesJobClient.internalSubmitJob(FlinkKubernetesJobClient.java:474) > ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at > org.apache.flink.client.didi.job.FlinkKubernetesJobClient.submitStreamSQL(FlinkKubernetesJobClient.java:184) > ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.component.FlinkJobK8sClientComponent.startStreamSql(FlinkJobK8sClientComponent.java:107) > ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartFlinkJob(FlinkK8sClientExecutor.java:230) > ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartJob(FlinkK8sClientExecutor.java:130) > [flink-client-executor-core-1.17.0-011_pre.jar:?] at > com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.main(FlinkK8sClientExecutor.java:55) > [flink-client-executor-core-1.17.0-011_pre.jar:?]Caused by: > java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please > file an issue. at > org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion.translateToPlanInternal(CommonExecUnion.java:61) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.Iterator.foreach(Iterator.scala:937) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.Iterator.foreach$(Iterator.scala:937) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.IterableLike.foreach(IterableLike.scala:70) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.IterableLike.foreach$(IterableLike.scala:69) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.TraversableLike.map(TraversableLike.scala:233) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197) > ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1873) > ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:891) > ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:2053) > ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at > com.didichuxing.flink.executor.CommunitySqlExecutor.execute(CommunitySqlExecutor.java:34) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > com.didichuxing.flink.executor.FlinkSqlExecutor.execute(FlinkSqlExecutor.java:51) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > com.didichuxing.flink.FlinkSqlApp.main(FlinkSqlApp.java:35) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_252] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_252] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_252] at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] ... 8 moreCaused by: > java.lang.OutOfMemoryError: Java heap space at > org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addAnyChild(ParserRuleContext.java:133) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addChild(ParserRuleContext.java:139) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.addContextToParseTree(Parser.java:617) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.enterRule(Parser.java:629) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.primary(JavaParser.java:7760) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:6954) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:7020) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.parExpression(JavaParser.java:6653) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5521) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at > org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) > ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] {code} > the sql is follow: > {code:java} > add jar 'hdfs://xxxx/realtime-netcar-udf-lastest-v1.1.3-SNAPSHOT.jar'; > CREATE FUNCTION RemoveMapKeySort as > 'org.didichuxing.flink.udf.RemoveMapKey2ArraySort';set > `table.exec.resource.default-parallelism` = `1`; > set `table.generated-code.max-length`=`7000`; > set `table.generated-code.allow` = `true`;-- Source schema > CREATE TABLE source_table ( > binlog_table string, > binlog_time string, > binlog_event string, > order_id string, > driver_id string, > passenger_id string, > channel string, > product_id string, > area string, > to_area string, > county string, > order_status string, > _status string, > t_order_status string, > f_order_status string, > _birth_time string, > cancelled_time string, > new_time string, > assigned_time string, > prepared_time string, > completed_time string, > departure_time string, > begun_time string, > consult_time string, > finished_time string, > estimate_time string, > _modify_time string, > start_dest_distance string, > driver_start_distance string, > distance string, > pre_total_fee string, > `type` string, > combo_type string, > complete_type string, > airport_type string, > carpool_type string, > compound_type string, > source_type string, > driver_type string, > capacity_level string, > level_type string, > require_level string, > route_type string, > strive_car_level string, > extra_type string, > extend_feature string, > is_special_price string, > carpool_price_type string, > estimate_id string, > new_lng string, > new_lat string, > assigned_lng string, > assigned_lat string, > prepared_lng string, > prepared_lat string, > begun_lng string, > begun_lat string, > finished_lng string, > finished_lat string, > dynamic_price string, > travel_id string, > passenger_count string, > combo_id string, > starting_lng string, > starting_lat string, > dest_lng string, > dest_lat string, > cap_price string, > spacious_car_alliance string, > json_extend_1 string, > p_access_key_id string > ) > WITH > ( > 'connector.topic' = 'xxx', > 'connector.startup-mode' = 'group-offsets', > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = 'xxxx', > 'connector.properties.group.id' = 'rxxx', > 'connector.parallelism' = '1', > 'format.type' = 'json' > );-- Sink schema > CREATE TABLE realtime_dwd_trip_trd_order_base ( > table_filter string, > table_time string, > table_event string, > order_id bigint, > driver_id bigint, > passenger_id bigint, > channel_id bigint, > channel_name string, > channel_level1 string, > channel_level2 string, > product_id bigint, > sub_product_line bigint, > level_1_product bigint, > level_2_product bigint, > level_3_product bigint, > call_city_id bigint, > to_city_id bigint, > call_city_name string, > call_county_id bigint, > call_county_name string, > region_name string, > order_status bigint, > call_time string, > cancel_time string, > answer_time string, > arrive_time string, > depart_time string, > charge_time string, > finish_time string, > call_time_minute string, > cancel_time_minute string, > answer_time_minute string, > arrive_time_minute string, > depart_time_minute string, > charge_time_minute string, > finish_time_minute string, > est_dis bigint, > est_arrive_dis bigint, > est_price_amt double, > combo_type bigint, > complete_type bigint, > airport_type bigint, > carpool_type bigint, > compound_type bigint, > source_type bigint, > driver_type bigint, > capacity_level bigint, > level_type bigint, > require_level bigint, > route_type bigint, > strive_car_level bigint, > extra_type bigint, > answer_dur bigint, > est_dur bigint, > arrive_dur bigint, > cancel_dur bigint, > driver_wait_dur bigint, > charge_dur bigint, > carpool_price_type bigint, > is_appt_flag bigint, > is_agency_call bigint, > is_anycar bigint, > is_cross_city_flag bigint, > is_call_flag bigint, > cancel_type bigint, > is_cancel_flag bigint, > is_grab_before_cannel_flag bigint, > is_grab_after_pas_cancel_flag bigint, > is_grab_after_dri_cancel_flag bigint, > is_grab_after_srvc_cancel_flag bigint, > is_reassigned_flag bigint, > is_reassign_flag bigint, > is_answer_flag bigint, > is_arrive_flag bigint, > is_begin_charge_flag bigint, > is_finish_flag bigint, > is_openapi bigint, > is_nirvana bigint, > is_ontheway bigint, > is_update_dest bigint, > is_service_agency_call bigint, > is_serial_assign bigint, > is_prepay bigint, > estimate_id string, > estimate_id_anycar string, > rank_index bigint, > is_anycar_sumtag bigint, > call_lng string, > call_lat string, > answer_lng string, > answer_lat string, > arrive_lng string, > arrive_lat string, > begin_charge_lng string, > begin_charge_lat string, > finish_lng string, > finish_lat string, > event_lng string, > event_lat string, > actual_distance bigint, > event_distance bigint, > extend_feature_value string, > distance_category int, > is_special_price int, > is_guide_scene int, > extend_feature string, > dynamic_price int, > travel_id bigint, > passenger_num bigint, > combo_id bigint, > dest_lng string, > dest_lat string, > cap_price_amt string, > spacious_car_alliance int, > pro_id int, > pro_name string, > json_extend_1 string, > access_key_id int > ) > WITH > ( > 'connector' = 'print' > );-- 非anycar部分 > create view source_non_anycar_table > as > select binlog_table > ,binlog_time > ,binlog_event > ,order_id > ,driver_id > ,passenger_id > ,channel > ,source_table.product_id > ,area > ,to_area > ,county > ,order_status as order_status > ,_status > ,t_order_status > ,f_order_status > ,_birth_time > ,coalesce(cancelled_time,'1971-01-01 00:00:00') as cancelled_time > ,new_time > ,assigned_time > ,prepared_time > ,completed_time > ,departure_time > ,begun_time > ,consult_time > ,finished_time > ,estimate_time > ,_modify_time > ,cast(start_dest_distance as bigint) as start_dest_distance > ,cast(driver_start_distance as bigint) as driver_start_distance > ,cast(cast(distance as double)*1000 as bigint) as distance > ,source_table.pre_total_fee > ,`type` > ,source_table.combo_type > ,complete_type > ,airport_type > ,carpool_type > ,compound_type > ,source_type > ,driver_type > ,capacity_level > ,level_type > ,source_table.require_level > ,route_type > ,strive_car_level > ,source_table.extra_type > ,RemoveMapKeySort(get_json_object(extend_feature, > '$.multi_require_product')) as extend_feature_value > ,is_special_price > ,carpool_price_type > ,estimate_id > ,'0' as estimate_id_anycar > ,9999 as rank_index > ,new_lng > ,new_lat > ,assigned_lng > ,assigned_lat > ,prepared_lng > ,prepared_lat > ,begun_lng > ,begun_lat > ,finished_lng > ,finished_lat > ,extend_feature > ,dynamic_price > ,travel_id > ,passenger_count > ,combo_id > ,starting_lng > ,starting_lat > ,dest_lng > ,dest_lat > ,cap_price > ,spacious_car_alliance > ,json_extend_1 > ,p_access_key_id > ,case > when ((coalesce(product_id,'-1') ='1' and > coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and > not (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in > ('1','4','2','3','5','10','17','7','6','37')) then '1' > when coalesce(product_id,'-1')='1' and > coalesce(combo_type,'-1') = '308' then '308' > when coalesce(product_id,'-1') in ('3','4') and > coalesce(require_level,'-1')<>'900' and coalesce(combo_type,'-1')<>'314' or > coalesce(product_id,'-1') in ('287') or coalesce(cast(product_id as > bigint),-1) between 3000 and 3999 then '3' > when coalesce(product_id,'-1') in ('6') then '6' > when coalesce(product_id,'-1') in ('7','20') then product_id > when coalesce(product_id,'-1') in ('3','4') and > coalesce(require_level,'-1')='900' then '99' > when (((coalesce(product_id,'-1') ='1' and > coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and > (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in > ('1','4','2','3','5','10','17','7','6','37'))) or coalesce(product_id,'-1') > in ('9','22') then '9' > when coalesce(product_id,'-1') in ('11','19','12','21') then > '11' > when coalesce(product_id,'-1') in ('26') then '999' > when (coalesce(product_id,'-1') in ('3','4') and > coalesce(combo_type,'-1')='314') or (coalesce(product_id,'-1') in > ('800','801')) then '314' > when product_id in ('38') then '38' > when coalesce(cast(channel as bigint),-1) between 1000001 and > 2000000 and (coalesce(cast(product_id as bigint),-1) between 50 and 59 or > coalesce(cast(product_id as bigint),-1) between 70 and 79 or > coalesce(cast(product_id as bigint),-1) between 200 and 599) then '50' > when coalesce(cast(channel as bigint),-1) not between 1000001 > and 2000000 and coalesce(product_id,'-1') in > ('51','52','53','54','59','71','73','74','75','76','79','77') then '50' > when coalesce(cast(product_id as bigint),-1) between 50 and > 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or > coalesce(cast(product_id as bigint),-1) between 200 and 599 then '51' > when coalesce(product_id,'-1') in ('700') then '700' > when coalesce(product_id,'-1') in ('81') then '81' > when coalesce(product_id,'-1') in ('83') then '327' > when coalesce(product_id,'-1') in ('15') then '15' > end as category_id > from source_table > where not (order_status in ('0','6') > and extend_feature is not null > and extend_feature like '%multi_require_product%') > and not (binlog_table='d_order_status' and order_status='5' and > f_order_status is null and completed_time>'2020-01-01 00:00:00') > ;-- anycar部分 > create view source_anycar_table > as > select binlog_table > ,binlog_time > ,binlog_event > ,order_id > ,driver_id > ,passenger_id > ,channel > ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'product_id', -1) as string) as product_id > ,area > ,to_area > ,county > ,order_status as order_status > ,_status > ,t_order_status > ,f_order_status > ,_birth_time > ,coalesce(cancelled_time,'1971-01-01 00:00:00') as cancelled_time > ,new_time > ,assigned_time > ,prepared_time > ,completed_time > ,departure_time > ,begun_time > ,consult_time > ,finished_time > ,estimate_time > ,_modify_time > ,cast(start_dest_distance as bigint) as start_dest_distance > ,cast(driver_start_distance as bigint) as driver_start_distance > ,cast(cast(distance as double)*1000 as bigint) as distance > ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'pre_total_fee', 0) as string) as pre_total_fee > ,`type` > ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'combo_type', -1) as string) as combo_type > ,complete_type > ,airport_type > ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'carpool_type', -1) as string) as carpool_type > ,compound_type > ,source_type > ,driver_type > ,capacity_level > ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'level_type', -1) as string) as level_type > ,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'require_level', '-1') as require_level > ,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'route_type', > '-1') as route_type > ,strive_car_level > ,cast(GET_LONG_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'extra_type', -1) as string) as extra_type > ,RemoveMapKeySort(get_json_object(extend_feature, > '$.multi_require_product')) as extend_feature_value > ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'is_special_price', -1) as string) as is_special_price > ,carpool_price_type > ,estimate_id > ,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'estimate_id', '1') as estimate_id_anycar > ,GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'rank', -1) as > rank_index > ,new_lng > ,new_lat > ,assigned_lng > ,assigned_lat > ,prepared_lng > ,prepared_lat > ,begun_lng > ,begun_lat > ,finished_lng > ,finished_lat > ,extend_feature > ,dynamic_price > ,travel_id > ,passenger_count > ,combo_id > ,starting_lng > ,starting_lat > ,dest_lng > ,dest_lat > ,cap_price > ,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), > 'spacious_car_alliance', -1) as string) as spacious_car_alliance > ,json_extend_1 > ,p_access_key_id > ,case > when ((coalesce(product_id,'-1') ='1' and > coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and > not (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in > ('1','4','2','3','5','10','17','7','6','37')) then '1' > when coalesce(product_id,'-1')='1' and > coalesce(combo_type,'-1') = '308' then '308' > when coalesce(product_id,'-1') in ('3','4') and > coalesce(require_level,'-1')<>'900' and coalesce(combo_type,'-1')<>'314' or > coalesce(product_id,'-1') in ('287') or coalesce(cast(product_id as > bigint),-1) between 3000 and 3999 then '3' > when coalesce(product_id,'-1') in ('6') then '6' > when coalesce(product_id,'-1') in ('7','20') then product_id > when coalesce(product_id,'-1') in ('3','4') and > coalesce(require_level,'-1')='900' then '99' > when (((coalesce(product_id,'-1') ='1' and > coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and > (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in > ('1','4','2','3','5','10','17','7','6','37'))) or coalesce(product_id,'-1') > in ('9','22') then '9' > when coalesce(product_id,'-1') in ('11','19','12','21') then > '11' > when coalesce(product_id,'-1') in ('26') then '999' > when (coalesce(product_id,'-1') in ('3','4') and > coalesce(combo_type,'-1')='314') or (coalesce(product_id,'-1') in > ('800','801')) then '314' > when product_id in ('38') then '38' > when coalesce(cast(channel as bigint),-1) between 1000001 and > 2000000 and (coalesce(cast(product_id as bigint),-1) between 50 and 59 or > coalesce(cast(product_id as bigint),-1) between 70 and 79 or > coalesce(cast(product_id as bigint),-1) between 200 and 599) then '50' > when coalesce(cast(channel as bigint),-1) not between 1000001 > and 2000000 and coalesce(product_id,'-1') in > ('51','52','53','54','59','71','73','74','75','76','79','77') then '50' > when coalesce(cast(product_id as bigint),-1) between 50 and > 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or > coalesce(cast(product_id as bigint),-1) between 200 and 599 then '51' > when coalesce(product_id,'-1') in ('700') then '700' > when coalesce(product_id,'-1') in ('81') then '81' > when coalesce(product_id,'-1') in ('83') then '327' > when coalesce(product_id,'-1') in ('15') then '15' > end as category_id > from source_table > ,LATERAL > TABLE(JSON_ARRAY_TO_STR(RemoveMapKeySort(get_json_object(extend_feature, > '$.multi_require_product')))) as T(any_call_info) > where order_status in ('0','6') > and extend_feature is not null > and extend_feature like '%multi_require_product%' > and not (binlog_table='d_order_status' and order_status='5' and > f_order_status is null and completed_time>'2020-01-01 00:00:00') > ; > -- Union all > create view view_merge > as > select * > from source_non_anycar_table > union all > select * > from source_anycar_table > ; > -- View > create view view_01 > as > select binlog_table > ,binlog_time > ,binlog_event > ,order_id > ,driver_id > ,passenger_id > ,channel ,'未知' as channel_name > ,'未知' as channel_level1 > ,'未知' as channel_level2 > ,product_id > ,case when level_type = '1' then '30' > when product_id = '3' and capacity_level = '11000' then '81' > > when spacious_car_alliance = '1' then '90' > else category_id > end as sub_product_line > ,area > ,to_area > ,'未知' as city_name > ,county > ,'未知' as county_name > ,'未知' as region > ,order_status > ,_birth_time > ,case when order_status = '6' and source_type <> '2' then > _modify_time > when (source_type = '2' and order_status = '6') then > _modify_time > when order_status = '7' and cancelled_time = '1971-01-01 > 00:00:00' then _modify_time > when ((complete_type not in ('7', '13') and order_status = > '12') or (source_type = '2' and order_status = '0') or order_status = '9') > and (order_status = '12' and complete_type = '14') then _modify_time > when order_status = '11' then _modify_time > when complete_type = '13' and order_status = '12' then > _modify_time > else cancelled_time > end cancel_time > ,assigned_time > ,prepared_time > ,departure_time > ,begun_time > ,finished_time > ,start_dest_distance > ,driver_start_distance > ,pre_total_fee > ,combo_type > ,complete_type > ,airport_type > ,carpool_type > ,compound_type > ,source_type > ,driver_type > ,capacity_level > ,level_type > ,require_level > ,route_type > ,strive_car_level > ,extra_type > ,case when assigned_time is not null and _birth_time is not null and > assigned_time > _birth_time then cast(DATE_DIFF(_birth_time,assigned_time) as > string) else '0' end as answer_dur > ,cast(estimate_time as bigint)*60 as est_dur > ,case when assigned_time is not null and prepared_time is not null > and prepared_time > assigned_time then > cast(DATE_DIFF(assigned_time,prepared_time) as string) else '0' end as > arrive_dur > ,case when cancelled_time is not null and assigned_time is not null > and cancelled_time > assigned_time then > cast(DATE_DIFF(assigned_time,cancelled_time) as string) > when cancelled_time is not null and _birth_time is not null > and cancelled_time > _birth_time then cast(DATE_DIFF(_birth_time > ,cancelled_time) as string) > when f_order_status is not null and assigned_time is not null > and order_status = '7' and _modify_time > assigned_time then > cast(DATE_DIFF(assigned_time,_modify_time) as string) > when f_order_status is not null and _birth_time is not null > and order_status = '7' and _modify_time > _birth_time then > cast(DATE_DIFF(_birth_time,_modify_time) as string) > else '0' > end as cancel_dur > ,case when begun_time is not null and prepared_time is not null and > begun_time > prepared_time then cast(DATE_DIFF(prepared_time,begun_time) as > string) else '0' end as driver_wait_dur > ,case when finished_time is not null and begun_time is not null and > finished_time > begun_time then cast(DATE_DIFF(begun_time,finished_time) as > string) else '0' end as charge_dur > ,cast(`type` as int) as is_appt_flag > ,case when extra_type is not null and BITWISE_AND(cast(extra_type as > bigint), 131072) > 0 then '1' else '0' end as is_agency_call > ,case when compound_type is not null and > BITWISE_AND(cast(compound_type as bigint),8192) > 0 then '1' else '0' end as > is_anycar > ,case when area is not null and to_area is not null and area <> > to_area then '1' else '0' end as is_cross_city_flag > -- ,case when _birth_time is not null and substring(_birth_time,1,10) > > '2020-01-01 00:00:00' and order_status='0' and binlog_table='d_order_base' > and (driver_id = '0' or driver_id is null) and > substring(_birth_time,1,16)=substring(_modify_time,1,16) and binlog_event='i' > then '1' else '0' end as is_call_flag > ,case when _birth_time is not null and substring(_birth_time,1,10) > > '2020-01-01 00:00:00' and order_status='0' and (driver_id = '0' or driver_id > is null) and substring(_birth_time,1,10)=substring(_modify_time,1,10) then > '1' else '0' end as is_call_flag > ,case when f_order_status is not null and source_type is not null > and cancelled_time is not null and _modify_time is not null and order_status > is not null and substring((case when order_status = '7' then cancelled_time > else _modify_time end),1,10) > '2020-01-01 00:00:00' and order_status = '6' > and source_type <> '2' and binlog_table='d_order_status' then '1' > when f_order_status is not null and source_type is not null > and cancelled_time is not null and _modify_time is not null and order_status > is not null and substring((case when order_status = '7' then cancelled_time > else _modify_time end),1,10) > '2020-01-01 00:00:00' and ((source_type = '2' > and order_status in ('6', '7')) or (source_type <> '2' and order_status = > '7')) and binlog_table='d_order_status' then '2' > when f_order_status is not null and source_type is not null > and cancelled_time is not null and complete_type is not null and _modify_time > is not null and order_status is not null and substring((case when > order_status = '7' then cancelled_time else _modify_time end),1,10) > > '2020-01-01 00:00:00' and ((complete_type not in ('7', '13') and order_status > = '12') or (source_type = '2' and order_status = '0') or order_status = '9') > and (order_status <> '12' or complete_type <> '14') and > binlog_table='d_order_status' then '3' > when f_order_status is not null and source_type is not null > and cancelled_time is not null and _modify_time is not null and order_status > is not null and substring((case when order_status = '7' then cancelled_time > else _modify_time end),1,10) > '2020-01-01 00:00:00' and order_status = '11' > and binlog_table='d_order_status' then '4' > when f_order_status is not null and source_type is not null > and cancelled_time is not null and complete_type is not null and _modify_time > is not null and order_status is not null and substring((case when > order_status = '7' then cancelled_time else _modify_time end),1,10) > > '2020-01-01 00:00:00' and complete_type = '13' and order_status = '12' and > binlog_table='d_order_status' then '5' > else '0' > end as cancel_type > ,case when f_order_status is not null and complete_type is not null > and substring((case when order_status = '7' then cancelled_time else > _modify_time end),1,10) > '2020-01-01 00:00:00' and complete_type = '7' and > binlog_table='d_order_status' then '1' else '0' end as is_reassigned_flag > ,case when source_type is not null and source_type = '2' and > binlog_table='d_order_status' then '1' else '0' end as is_reassign_flag > ,case when assigned_time is not null and > substring(assigned_time,1,10) > '2020-01-01 00:00:00' and order_status='1' > and f_order_status is not null and binlog_table='d_order_status' and > driver_id is not null and cast(driver_id as bigint) > 0 then '1' else '0' end > as is_answer_flag > ,case when prepared_time is not null and > substring(prepared_time,1,10) > '2020-01-01 00:00:00' and order_status='2' > and f_order_status is not null and binlog_table='d_order_status' and > driver_id is not null and cast(driver_id as bigint) > 0 then '1' else '0' end > as is_arrive_flag > ,case when begun_time is not null and substring(begun_time,1,10) > > '2020-01-01 00:00:00' and order_status='4' and binlog_table='d_order_status' > and f_order_status is not null and driver_id is not null and cast(driver_id > as bigint) > 0 then '1' else '0' end as is_begin_charge_flag > ,case when finished_time is not null and > substring(finished_time,1,10) > '2020-01-01 00:00:00' and order_status = '5' > and f_order_status is not null and binlog_table='d_order_status' then '1' > else '0' end as is_finish_flag > ,case when passenger_id is not null and passenger_id = > '2624535533694' or (channel is not null and cast(channel as bigint) between > 27010 and 27019) then '1' else '0' end as is_openapi > ,case when product_id is not null and capacity_level is not null and > product_id in ('3', '4') and capacity_level = '2000' then '1' else '0' end as > is_nirvana > ,case when extra_type is not null and BITWISE_AND(cast(extra_type as > bigint), 33554432) > 0 then '1' else '0' end as is_ontheway > ,case when extra_type is not null and BITWISE_AND(cast(extra_type as > bigint), 8796093022208) > 0 then '1' else '0' end as is_update_dest > ,case when extra_type is not null and BITWISE_AND(cast(extra_type as > bigint), 137438953472) > 0 then '1' else '0' end as is_service_agency_call > ,case when extra_type is not null and BITWISE_AND(cast(extra_type as > bigint), 8388608) > 0 then '1' else '0' end as is_serial_assign > ,case when extra_type is not null and BITWISE_AND(cast(extra_type as > bigint), 8589934592) > 0 then '1' else '0' end as is_prepay > ,is_special_price > ,carpool_price_type > ,estimate_id > ,estimate_id_anycar > ,rank_index > ,starting_lng > ,starting_lat > ,assigned_lng > ,assigned_lat > ,prepared_lng > ,prepared_lat > ,begun_lng > ,begun_lat > ,finished_lng > ,finished_lat > ,distance > ,extend_feature_value > ,case > when distance<3*1000 then 0 > when distance>=3*1000 and distance<6*1000 then 3 > when distance>=6*1000 and distance<10*1000 then 6 > when distance>=10*1000 and distance<15*1000 then 10 > when distance>=15*1000 and distance<20*1000 then 15 > when distance>=20*1000 then 20 > end as real_distance_category > ,case > when start_dest_distance<3*1000 then 0 > when start_dest_distance>=3*1000 and > start_dest_distance<6*1000 then 3 > when start_dest_distance>=6*1000 and > start_dest_distance<10*1000 then 6 > when start_dest_distance>=10*1000 and > start_dest_distance<15*1000 then 10 > when start_dest_distance>=15*1000 and > start_dest_distance<20*1000 then 15 > when start_dest_distance>=20*1000 then 20 > end as estimate_distance_category > ,extend_feature > ,case when extra_type is not null and BITWISE_AND(cast(extra_type as > bigint), 268435456) > 0 then '1' else '0' end as is_guide_scene > ,dynamic_price > ,travel_id > ,passenger_count > ,combo_id > ,dest_lng > ,dest_lat > ,cap_price > ,spacious_car_alliance > ,'1' as pro_id > ,'未知' as pro_name > ,json_extend_1 > ,p_access_key_id as access_key_id > from view_merge > ; > -- insert into order_base_test > -- select > -- order_id, > -- driver_id, > -- passenger_id, > -- product_id, > -- sub_product_line > -- from view_01;--Sink > insert into realtime_dwd_trip_trd_order_base > select cast(binlog_table as string) as table_filter > ,cast(binlog_time as string) as table_time > ,cast(binlog_event as string) as table_event > ,cast(order_id as bigint) as order_id > ,cast(driver_id as bigint) as driver_id > ,cast(passenger_id as bigint) as passenger_id > ,cast(channel as bigint) as channel_id > ,cast(channel_name as string) as channel_name > ,cast(channel_level1 as string) as channel_level1 > ,cast(channel_level2 as string) as channel_level2 > ,cast(product_id as bigint) as product_id > ,cast(sub_product_line as bigint) as sub_product_line > ,case > when sub_product_line in ('1', '3', '6', '7', '9', '20', > '99', '314', '30', '81', '700', '90','327') or (sub_product_line in > ('11','12') and require_level='2200') then 110000 > when sub_product_line in ('11','12') and > require_level<>'2200' then 120000 > when sub_product_line in ('308') then 130000 > when sub_product_line in ('38') then 140000 > when sub_product_line in ('50','51') then 190000 > when sub_product_line = '15' then 150000 > else 990000 > end as level_1_product > ,case > when sub_product_line in ('3','7','30','81','90') and > combo_type not in ('4','302') then 110100 > when sub_product_line in ('3','7') and combo_type = '4' > then 110200 > when sub_product_line in ('3','7') and combo_type = '302' > then 110300 > when sub_product_line in ('20','99') > then 110400 > when sub_product_line in ('1','6') > then 110500 > when sub_product_line in ('9') > then 110600 > when sub_product_line in ('11','12') and > require_level<>'2200' then 120100 > when sub_product_line in ('308') > then 130100 > when sub_product_line in ('38') > then 140200 > when sub_product_line in ('700') > then 110800 > when sub_product_line in ('314') > then 110900 > when sub_product_line in ('11','12') and require_level='2200' > then 111000 > when (cast(channel as bigint)>=1000001 and cast(channel as > bigint)<=2000000 and sub_product_line = '50') or ((cast(channel as > bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line in > ('50') and coalesce(product_id,'-1') <> '77') or sub_product_line = '51' > then 190200 > when sub_product_line in ('327') then 111200 > when sub_product_line = '15' then 150100 > when (cast(channel as bigint)<1000001 or cast(channel as > bigint) >2000000) and sub_product_line = '50' and coalesce(product_id,'-1') = > '77' then 190100 > else 999900 > end as level_2_product > ,case when sub_product_line = '11' and require_level = '2000' and > level_type = '5' then 120105 > when sub_product_line = '11' and require_level = '1100' and > level_type = '5' then 120106 > when sub_product_line = '11' and require_level = '2200' and > level_type = '7' then 111002 > else > case > when sub_product_line in ('3','7') and combo_type not > in ('4','302') then 110101 > when sub_product_line='30' then 110102 > when sub_product_line='314' then 110103 > when sub_product_line in ('3','7') and combo_type='4' > and carpool_type='4' then 110201 > when sub_product_line in ('3','7') and combo_type='4' > and carpool_type='5' then 110202 > when sub_product_line in ('3','7') and > require_level='600' and combo_type='4' and carpool_type in ('1','2') then > 110203 > when sub_product_line in ('3','7') and > require_level='610' and combo_type='4' and carpool_type='2' then 110204 > when sub_product_line in ('3','7') and > require_level='600' and combo_type='4' and carpool_type in ('10') then 110205 > when sub_product_line in ('3','7') and require_level > = '4' then 110299 > when sub_product_line in ('3','7') and combo_type = > '302' and carpool_type = '6' then 110303 > when sub_product_line in ('3','7') and combo_type = > '302' and carpool_type = '3' and not (coalesce(product_id,'-1') in ('287') or > (coalesce(product_id,'-1') not in ('3013') and cast(coalesce(product_id,'-1') > as bigint) between 3000 and 3999)) then 110302 > when sub_product_line in ('3','7') and combo_type = > '302' and carpool_type not in ('3','6','8') then 110304 > when sub_product_line in ('3','7') and combo_type = > '302' and carpool_type = '3' then 110305 > when sub_product_line in ('3','7') and combo_type = > '302' and carpool_type = '8' then 110306 > when sub_product_line in ('20','99') then 110401 > when sub_product_line in ('1','6') and > require_level='100' then 110501 > when sub_product_line in ('1','6') and > require_level='400' then 110502 > when sub_product_line in ('1','6') and > require_level='3300' then 110503 > when sub_product_line in ('1','6') and > require_level='200' then 110504 > when sub_product_line in ('1','6') then 110599 > when sub_product_line in ('9') then 110601 > when (cast(channel as bigint)<1000001 or cast(channel > as bigint) >2000000) and sub_product_line in ('50') and > coalesce(product_id,'-1') in ('52','53','54','59') then 110721 > when sub_product_line= '51' or ((cast(channel as > bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line in > ('50') and coalesce(product_id,'-1') not in ('52','53','54','59','77')) then > 110722 > when sub_product_line = '11' and require_level='2000' > then 120101 > when sub_product_line = '11' and require_level = > '1100' and combo_type = '4' and carpool_type = '2' then 120102 > when sub_product_line = '11' and > coalesce(cast(is_special_price as bigint), -1) = 1 then 120103 > when sub_product_line = '11' and require_level > ='1100' then 120104 > when sub_product_line = '12' then 120201 > when sub_product_line in ('308') then 130101 > when sub_product_line in ('38') then 140201 > when sub_product_line in ('81') then 110104 > when sub_product_line in ('90') then 110105 > when sub_product_line in ('700') then 110801 > when (cast(channel as bigint)>=1000001 and > cast(channel as bigint)<=2000000 and sub_product_line = '50') then 190201 > when sub_product_line in ('11','12') and > require_level='2200' and level_type='0' then 111001 > when sub_product_line in ('11','12') and > require_level='2200' and level_type='7' then 111002 > when sub_product_line in ('327') then 111201 > when sub_product_line = '15' then 150101 > when (cast(channel as bigint)<1000001 or cast(channel > as bigint) >2000000) and sub_product_line = '50' and > coalesce(product_id,'-1')='77' then 190101 > else 999999 > end > end as level_3_product > ,cast(area as bigint) as call_city_id > ,cast(to_area as bigint) as to_city_id > ,cast(city_name as string) as call_city_name > ,cast(county as bigint) as call_county_id > ,cast(county_name as string) as call_county_name > ,cast(region as string) as region_name > ,cast(order_status as bigint) as order_status > ,cast(_birth_time as string) as call_time > ,cast(cancel_time as string) as cancel_time > ,cast(assigned_time as string) as answer_time > ,cast(prepared_time as string) as arrive_time > ,cast(departure_time as string) as depart_time > ,cast(begun_time as string) as charge_time > ,cast(finished_time as string) as finish_time > ,cast(cast(substring(cast(TIMESTAMP_TO_MS(_birth_time, 'yyyy-MM-dd > HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as call_time_minute > ,cast(cast(substring(cast(TIMESTAMP_TO_MS(cancel_time, 'yyyy-MM-dd > HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as cancel_time_minute > ,cast(cast(substring(cast(TIMESTAMP_TO_MS(assigned_time, 'yyyy-MM-dd > HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as answer_time_minute > ,cast(cast(substring(cast(TIMESTAMP_TO_MS(prepared_time, 'yyyy-MM-dd > HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as arrive_time_minute > ,cast(cast(substring(cast(TIMESTAMP_TO_MS(departure_time, 'yyyy-MM-dd > HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as depart_time_minute > ,cast(cast(substring(cast(TIMESTAMP_TO_MS(begun_time, 'yyyy-MM-dd > HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as charge_time_minute > ,cast(cast(substring(cast(TIMESTAMP_TO_MS(finished_time, 'yyyy-MM-dd > HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as finish_time_minute > ,cast(start_dest_distance as bigint) as est_dis > ,cast(driver_start_distance as bigint) as est_arrive_dis > ,cast(pre_total_fee as double) as est_price_amt > ,cast(combo_type as bigint) as combo_type > ,cast(complete_type as bigint) as complete_type > ,cast(airport_type as bigint) as airport_type > ,cast(carpool_type as bigint) as carpool_type > ,cast(compound_type as bigint) as compound_type > ,cast(source_type as bigint) as source_type > ,cast(driver_type as bigint) as driver_type > ,cast(capacity_level as bigint) as capacity_level > ,cast(level_type as bigint) as level_type > ,cast(require_level as bigint) as require_level > ,cast(route_type as bigint) as route_type > ,cast(strive_car_level as bigint) as strive_car_level > ,cast(extra_type as bigint) as extra_type > ,cast(answer_dur as bigint) as answer_dur > ,cast(est_dur as bigint) as est_dur > ,cast(arrive_dur as bigint) as arrive_dur > ,cast(cancel_dur as bigint) as cancel_dur > ,cast(driver_wait_dur as bigint) as driver_wait_dur > ,cast(charge_dur as bigint) as charge_dur > ,cast(carpool_price_type as bigint) as carpool_price_type > ,cast(is_appt_flag as bigint) as is_appt_flag > ,cast(is_agency_call as bigint) as is_agency_call > ,cast(is_anycar as bigint) as is_anycar > ,cast(is_cross_city_flag as bigint) as is_cross_city_flag > ,cast(is_call_flag as bigint) as is_call_flag > ,cast(cancel_type as bigint) as cancel_type > ,cast(case when cancel_type <> '0' then 1 else 0 end as bigint) as > is_cancel_flag > ,cast(case when cancel_type = '1' then 1 else 0 end as bigint) as > is_grab_before_cannel_flag > ,cast(case when order_status<>'6' and cancel_type = '2' then 1 else 0 > end as bigint) as is_grab_after_pas_cancel_flag > ,cast(case when order_status<>'6' and cancel_type = '3' then 1 else 0 > end as bigint) as is_grab_after_dri_cancel_flag > ,cast(case when order_status<>'6' and cancel_type = '4' then 1 else 0 > end as bigint) as is_grab_after_srvc_cancel_flag > ,cast(is_reassigned_flag as bigint) as is_reassigned_flag > ,cast(is_reassign_flag as bigint) as is_reassign_flag > ,cast(is_answer_flag as bigint) as is_answer_flag > ,cast(is_arrive_flag as bigint) as is_arrive_flag > ,cast(is_begin_charge_flag as bigint) as is_begin_charge_flag > ,cast(is_finish_flag as bigint) as is_finish_flag > ,cast(is_openapi as bigint) as is_openapi > ,cast(is_nirvana as bigint) as is_nirvana > ,cast(is_ontheway as bigint) as is_ontheway > ,cast(is_update_dest as bigint) as is_update_dest > ,cast(is_service_agency_call as bigint) as is_service_agency_call > ,cast(is_serial_assign as bigint) as is_serial_assign > ,cast(is_prepay as bigint) as is_prepay > ,estimate_id > ,estimate_id_anycar > ,rank_index > ,case when (rank_index = 0 or rank_index = 9999) then 1 else 0 end > is_anycar_sumtag > ,starting_lng as call_lng > ,starting_lat as call_lat > ,assigned_lng as answer_lng > ,assigned_lat as answer_lat > ,prepared_lng as arrive_lng > ,prepared_lat as arrive_lat > ,begun_lng as begin_charge_lng > ,begun_lat as begin_charge_lat > ,finished_lng as finish_lng > ,finished_lat as finish_lat > ,case when is_call_flag='1' then starting_lng > when is_answer_flag='1' then assigned_lng > when is_arrive_flag='1' then prepared_lng > when is_begin_charge_flag='1' then begun_lng > when is_finish_flag='1' then finished_lng > end as event_lng > ,case when is_call_flag='1' then starting_lat > when is_answer_flag='1' then assigned_lat > when is_arrive_flag='1' then prepared_lat > when is_begin_charge_flag='1' then begun_lat > when is_finish_flag='1' then finished_lat > end as event_lat > ,distance as actual_distance > ,case when is_call_flag='1' then start_dest_distance > when is_answer_flag='1' then driver_start_distance > when is_finish_flag='1' then distance > end as event_distance > ,'' as extend_feature_value > ,case when is_finish_flag = '1' then real_distance_category > else estimate_distance_category > end distance_category > ,cast(is_special_price as int) as is_special_price > ,cast(is_guide_scene as int) as is_guide_scene > ,extend_feature > ,cast(dynamic_price as int) as dynamic_price > ,cast(travel_id as bigint) as travel_id > ,cast(passenger_count as bigint) as passenger_num > ,cast(combo_id as bigint) as combo_id > ,dest_lng > ,dest_lat > ,cap_price as cap_price_amt > ,cast(spacious_car_alliance as int) as spacious_car_alliance > ,cast(pro_id as int) as pro_id > ,pro_name > ,json_extend_1 > ,cast(access_key_id as int) as access_key_id > from view_01 > where channel not in ('1', '20001','1500001','1510001', '1010000001') > ;{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)