Hi Kant, you should use compileOnly and then add the same dependency as a testImplementation.
On Fri, Feb 28, 2020 at 10:54 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi Kant, > > "We cannot use "compileOnly" or "shadow" configurations since then we > could not run code in the IDE or with "gradle run"." > > You can take a look to document [1]. > There are project templates for Java. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html > > Best, > Jingsong Lee > > On Fri, Feb 28, 2020 at 5:17 PM kant kodali <kanth...@gmail.com> wrote: > >> Hi Jark, >> >> You mean I shouldn't package them into the jar so I need to specify them >> as compileOnly as Lake Shen pointed out? because I still need them to use >> it in my IDE/compile my application. just tried it and yes it works below >> is updated build.gradle >> >> buildscript { >> repositories { >> jcenter() // this applies only to the Gradle 'Shadow' plugin >> } >> dependencies { >> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' >> } >> } >> >> plugins { >> id 'java' >> id 'application' >> } >> >> mainClassName = 'Test' >> apply plugin: 'com.github.johnrengelman.shadow' >> >> // artifact properties >> /*group = 'org.myorg.quickstart' >> version = '0.1'*/ >> description = """Flink Quickstart Job""" >> >> ext { >> javaVersion = '1.8' >> flinkVersion = '1.10.0' >> scalaBinaryVersion = '2.11' >> slf4jVersion = '1.7.7' >> log4jVersion = '1.2.17' >> } >> >> >> sourceCompatibility = javaVersion >> targetCompatibility = javaVersion >> tasks.withType(JavaCompile) { >> options.encoding = 'UTF-8' >> } >> >> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] >> >> // declare where to find the dependencies of your project >> repositories { >> mavenCentral() >> maven { url >> "https://repository.apache.org/content/repositories/snapshots/" } >> } >> >> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then >> we could not run code >> // in the IDE or with "gradle run". We also cannot exclude transitive >> dependencies from the >> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159). >> // -> Explicitly define the // libraries we want to be included in the >> "flinkShadowJar" configuration! >> >> configurations { >> flinkShadowJar // dependencies which go into the shadowJar >> >> // always exclude these (also from transitive dependencies) since they >> are provided by Flink >> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading' >> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: >> 'jsr305' >> flinkShadowJar.exclude group: 'org.slf4j' >> flinkShadowJar.exclude group: 'log4j' >> flinkShadowJar.exclude group: 'org.codehaus.janino' >> } >> >> // declare the dependencies for your production and test code >> dependencies { >> // -------------------------------------------------------------- >> // Compile-time dependencies that should NOT be part of the >> // shadow jar and are provided in the lib folder of Flink >> // -------------------------------------------------------------- >> compile "org.apache.flink:flink-java:${flinkVersion}" >> compile >> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >> >> flinkShadowJar >> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >> compileOnly >> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >> flinkShadowJar >> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >> flinkShadowJar >> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >> >> // -------------------------------------------------------------- >> // Dependencies that should be part of the shadow jar, e.g. >> // connectors. These must be in the flinkShadowJar configuration! >> // -------------------------------------------------------------- >> //flinkShadowJar >> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}" >> >> compile "log4j:log4j:${log4jVersion}" >> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}" >> >> // Add test dependencies here. >> // testCompile "junit:junit:4.12" >> } >> >> // make compileOnly dependencies available for tests: >> sourceSets { >> main.compileClasspath += configurations.flinkShadowJar >> main.runtimeClasspath += configurations.flinkShadowJar >> >> test.compileClasspath += configurations.flinkShadowJar >> test.runtimeClasspath += configurations.flinkShadowJar >> >> javadoc.classpath += configurations.flinkShadowJar >> } >> >> run.classpath = sourceSets.main.runtimeClasspath >> >> jar { >> manifest { >> attributes 'Built-By': System.getProperty('user.name'), >> 'Build-Jdk': System.getProperty('java.version') >> } >> } >> >> shadowJar { >> configurations = [project.configurations.flinkShadowJar] >> } >> >> >> On Fri, Feb 28, 2020 at 1:09 AM Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Kant, >>> >>> You shouldn't compile `flink-table-planner` or >>> `flink-table-planner-blink` into your user jar. They have been provided by >>> Flink cluster. >>> >>> Best, >>> Jark >>> >>> On Fri, 28 Feb 2020 at 15:28, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Here is my build.gradle and I am not sure which jar uses >>>> org.codehaus.commons.compiler.ICompilerFactory >>>> >>>> buildscript { >>>> repositories { >>>> jcenter() // this applies only to the Gradle 'Shadow' plugin >>>> } >>>> dependencies { >>>> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' >>>> } >>>> } >>>> >>>> plugins { >>>> id 'java' >>>> id 'application' >>>> } >>>> >>>> mainClassName = 'Test' >>>> apply plugin: 'com.github.johnrengelman.shadow' >>>> >>>> // artifact properties >>>> /*group = 'org.myorg.quickstart' >>>> version = '0.1'*/ >>>> description = """Flink Quickstart Job""" >>>> >>>> ext { >>>> javaVersion = '1.8' >>>> flinkVersion = '1.10.0' >>>> scalaBinaryVersion = '2.11' >>>> slf4jVersion = '1.7.7' >>>> log4jVersion = '1.2.17' >>>> } >>>> >>>> >>>> sourceCompatibility = javaVersion >>>> targetCompatibility = javaVersion >>>> tasks.withType(JavaCompile) { >>>> options.encoding = 'UTF-8' >>>> } >>>> >>>> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] >>>> >>>> // declare where to find the dependencies of your project >>>> repositories { >>>> mavenCentral() >>>> maven { url >>>> "https://repository.apache.org/content/repositories/snapshots/" } >>>> } >>>> >>>> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then >>>> we could not run code >>>> // in the IDE or with "gradle run". We also cannot exclude transitive >>>> dependencies from the >>>> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159). >>>> // -> Explicitly define the // libraries we want to be included in the >>>> "flinkShadowJar" configuration! >>>> >>>> configurations { >>>> flinkShadowJar // dependencies which go into the shadowJar >>>> >>>> // always exclude these (also from transitive dependencies) since they >>>> are provided by Flink >>>> flinkShadowJar.exclude group: 'org.apache.flink', module: >>>> 'force-shading' >>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: >>>> 'jsr305' >>>> flinkShadowJar.exclude group: 'org.slf4j' >>>> flinkShadowJar.exclude group: 'log4j' >>>> } >>>> >>>> // declare the dependencies for your production and test code >>>> dependencies { >>>> // -------------------------------------------------------------- >>>> // Compile-time dependencies that should NOT be part of the >>>> // shadow jar and are provided in the lib folder of Flink >>>> // -------------------------------------------------------------- >>>> compile "org.apache.flink:flink-java:${flinkVersion}" >>>> compile >>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>>> >>>> flinkShadowJar >>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >>>> flinkShadowJar >>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >>>> flinkShadowJar >>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >>>> flinkShadowJar >>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >>>> >>>> // -------------------------------------------------------------- >>>> // Dependencies that should be part of the shadow jar, e.g. >>>> // connectors. These must be in the flinkShadowJar configuration! >>>> // -------------------------------------------------------------- >>>> //flinkShadowJar >>>> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}" >>>> >>>> compile "log4j:log4j:${log4jVersion}" >>>> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}" >>>> >>>> // Add test dependencies here. >>>> // testCompile "junit:junit:4.12" >>>> } >>>> >>>> // make compileOnly dependencies available for tests: >>>> sourceSets { >>>> main.compileClasspath += configurations.flinkShadowJar >>>> main.runtimeClasspath += configurations.flinkShadowJar >>>> >>>> test.compileClasspath += configurations.flinkShadowJar >>>> test.runtimeClasspath += configurations.flinkShadowJar >>>> >>>> javadoc.classpath += configurations.flinkShadowJar >>>> } >>>> >>>> run.classpath = sourceSets.main.runtimeClasspath >>>> >>>> jar { >>>> manifest { >>>> attributes 'Built-By': System.getProperty('user.name'), >>>> 'Build-Jdk': System.getProperty('java.version') >>>> } >>>> } >>>> >>>> shadowJar { >>>> configurations = [project.configurations.flinkShadowJar] >>>> } >>>> >>>> >>>> On Thu, Feb 27, 2020 at 10:31 PM Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Hi kant, >>>>> >>>>> As Jark said, >>>>> Your user jar should not contains " >>>>> org.codehaus.commons.compiler.ICompilerFactory" dependencies. This >>>>> will make calcite can not work. >>>>> >>>>> In 1.10, have made Flink client respect classloading policy that >>>>> default policy is child first [1]. More details can find in [2]. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749 >>>>> [2] https://issues.apache.org/jira/browse/FLINK-13749 >>>>> >>>>> Best, >>>>> Jingsong Lee >>>>> >>>>> On Fri, Feb 28, 2020 at 11:19 AM Jark Wu <imj...@gmail.com> wrote: >>>>> >>>>>> Hi Kant, >>>>>> >>>>>> Are you depending a custom janino or something like hive-exec in your >>>>>> pom.xml? >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Fri, 28 Feb 2020 at 10:13, kant kodali <kanth...@gmail.com> wrote: >>>>>> >>>>>>> It works within IDE but not when I submit using command using flink >>>>>>> run myApp.jar >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Feb 27, 2020 at 3:32 PM kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Below is the sample code using Flink 1.10 >>>>>>>> >>>>>>>> public class Test { >>>>>>>> >>>>>>>> public static void main(String... args) throws Exception { >>>>>>>> >>>>>>>> EnvironmentSettings bsSettings = >>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>>>>>> final StreamExecutionEnvironment env = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> env.setStateBackend((StateBackend) new >>>>>>>> RocksDBStateBackend("file:///tmp")); >>>>>>>> >>>>>>>> >>>>>>>> StreamTableEnvironment bsTableEnv = >>>>>>>> StreamTableEnvironment.create(env, bsSettings); >>>>>>>> >>>>>>>> Properties properties = new Properties(); >>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>>>> properties.setProperty("group.id", "test"); >>>>>>>> >>>>>>>> FlinkKafkaConsumer<String> consumer1 = new >>>>>>>> FlinkKafkaConsumer<>( >>>>>>>> java.util.regex.Pattern.compile("test-topic1"), >>>>>>>> new SimpleStringSchema(), >>>>>>>> properties); >>>>>>>> >>>>>>>> FlinkKafkaConsumer<String> consumer2 = new >>>>>>>> FlinkKafkaConsumer<>( >>>>>>>> java.util.regex.Pattern.compile("test-topic2"), >>>>>>>> new SimpleStringSchema(), >>>>>>>> properties); >>>>>>>> >>>>>>>> DataStream<String> stream1 = env.addSource(consumer1); >>>>>>>> DataStream<String> stream2 = env.addSource(consumer2); >>>>>>>> >>>>>>>> bsTableEnv.createTemporaryView("sample1", stream1); >>>>>>>> bsTableEnv.createTemporaryView("sample2", stream2); >>>>>>>> >>>>>>>> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 >>>>>>>> INNER JOIN sample2 on sample1.f0=sample2.f0"); >>>>>>>> result.printSchema(); >>>>>>>> >>>>>>>> >>>>>>>> bsTableEnv.toRetractStream(result, Row.class).print(); >>>>>>>> bsTableEnv.execute("sample job"); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Feb 27, 2020 at 3:22 PM kant kodali <kanth...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Fixed the typo. >>>>>>>>> >>>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> >>>>>>>>> My sample program works in Flink 1.9 but in 1.10 I get the >>>>>>>>> following error when I am submitting the job. otherwords it fails to >>>>>>>>> submit >>>>>>>>> a job. any idea? >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> On Thu, Feb 27, 2020 at 2:19 PM kant kodali <kanth...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> My sample program works in Flink 1.9 but in 1.0 I get the >>>>>>>>>> following error when I am submitting the job. otherwords it fails to >>>>>>>>>> submit >>>>>>>>>> a job. any idea? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>>>>> main method caused an error: Unable to instantiate java compiler >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>>>>>>>> >>>>>>>>>> Caused by: java.lang.IllegalStateException: Unable to instantiate >>>>>>>>>> java compiler >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>>>> >>>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>>>> >>>>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>>>> >>>>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:296) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:287) >>>>>>>>>> >>>>>>>>>> at Reconciliation.main(Reconciliation.java:52) >>>>>>>>>> >>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>> >>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>>>>>>>> >>>>>>>>>> ... 8 more >>>>>>>>>> >>>>>>>>>> Caused by: java.lang.ClassCastException: >>>>>>>>>> org.codehaus.janino.CompilerFactory cannot be cast to >>>>>>>>>> org.codehaus.commons.compiler.ICompilerFactory >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:432) >>>>>>>>>> >>>>>>>>>> ... 57 more >>>>>>>>>> >>>>>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> > > -- > Best, Jingsong Lee >