Hi experts,
I am trying to experiment how to use Hive to store metadata along using
Flink SQL. I am running Hive inside a docker container locally, and running
Flink SQL program through IDE.

Flink version 1.12.0

the sample code looks like:

StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings settings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);

String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive/conf/";
String version = "2.3.6";


HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("default");

but then I encountered:

Exception in thread "main" java.lang.IncompatibleClassChangeError:
class org.apache.flink.sql.parser.validate.FlinkSqlConformance can not
implement org.apache.calcite.sql.validate.SqlConformance, because it
is not an interface (org.apache.calcite.sql.validate.SqlConformance is
in unnamed module of loader 'app')
        at java.base/java.lang.ClassLoader.defineClass1(Native Method)
        at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
        at 
java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
        at 
java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:802)
        at 
java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:700)
        at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:623)
        at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:113)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:47)
        at 
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
        at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:139)
        at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
        at datatype.test.StreamMain.main(StreamMain.java:25).

The Exception is thrown when executing: StreamTableEnvironment tEnv =
StreamTableEnvironment.create(bsEnv, settings);

my pom dependencies include following, flink.version == 1.12.0,
scala.binary.version == 2.1.1

<dependencies>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.11</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>2.3.6</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

</dependencies>

Thanks a lot!

Eleanore

Reply via email to