Hi experts, I am trying to experiment how to use Hive to store metadata along using Flink SQL. I am running Hive inside a docker container locally, and running Flink SQL program through IDE.
Flink version 1.12.0 the sample code looks like: StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings); String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/opt/hive/conf/"; String version = "2.3.6"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.useDatabase("default"); but then I encountered: Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.flink.sql.parser.validate.FlinkSqlConformance can not implement org.apache.calcite.sql.validate.SqlConformance, because it is not an interface (org.apache.calcite.sql.validate.SqlConformance is in unnamed module of loader 'app') at java.base/java.lang.ClassLoader.defineClass1(Native Method) at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016) at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:802) at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:700) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:623) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:113) at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:47) at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:139) at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111) at datatype.test.StreamMain.main(StreamMain.java:25). The Exception is thrown when executing: StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings); my pom dependencies include following, flink.version == 1.12.0, scala.binary.version == 2.1.1 <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.3.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> Thanks a lot! Eleanore