wei created FLINK-28337: --------------------------- Summary: java.lang.IllegalArgumentException: Table identifier not set Key: FLINK-28337 URL: https://issues.apache.org/jira/browse/FLINK-28337 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.14.2 Environment: Flink 1.14.2
Hive 3.1.2 Iceberg 0.12.1 Hadoop 3.2.1 Reporter: wei I use Flink Table SDK to select iceberg table. Set hivecatalog to usercatalog, but looks like the default_catalog is still used. The error message is as flollows: {code:java} 0:42:41,886 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 10:42:44,392 INFO org.apache.iceberg.BaseMetastoreCatalog [] - Table loaded by catalog: default_iceberg.s3a_flink.icebergtbcloudtrackingtest 10:42:44,397 INFO org.apache.iceberg.mr.hive.HiveIcebergSerDe [] - Using schema from existing table {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]} 10:42:44,832 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Refreshing table metadata from new version: s3a://warehouse/s3a_flink.db/icebergTBCloudTrackingTest/metadata/00011-8d1ef9f1-8172-49fd-b0de-58196642b662.metadata.json 10:42:44,866 INFO org.apache.iceberg.BaseMetastoreCatalog [] - Table loaded by catalog: default_iceberg.s3a_flink.icebergtbcloudtrackingtest 10:42:44,867 INFO org.apache.iceberg.mr.hive.HiveIcebergSerDe [] - Using schema from existing table {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]} 10:42:48,079 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://hiveserver:9083 10:42:48,079 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 3 10:42:48,081 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to metastore. 10:42:48,081 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=root (auth:SIMPLE) retries=1 delay=1 lifetime=0 10:42:48,132 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a connection to metastore, current connections: 2 10:42:48,308 INFO org.apache.flink.connectors.hive.HiveParallelismInference [] - Hive source(s3a_flink.icebergTBCloudTrackingTest}) getNumFiles use time: 171 ms, result: 2 Exception in thread "main" java.lang.IllegalArgumentException: Table identifier not set at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:142) at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:114) at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:89) at org.apache.iceberg.mr.mapreduce.IcebergInputFormat.lambda$getSplits$0(IcebergInputFormat.java:102) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.iceberg.mr.mapreduce.IcebergInputFormat.getSplits(IcebergInputFormat.java:102) at org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.getSplits(MapredIcebergInputFormat.java:69) at org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:98) at org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:107) at org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71) at org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149) at org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107) at org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95) at org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144) at org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106) at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) {code} code is : {code:java} EnvironmentSettings settings = EnvironmentSettings.newInstance() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); String catalogName = "s3IcebergCatalog"; String defaultDatabase = "s3a_flink"; String hiveConfDir = "flink-cloud/src/main/resources"; HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir); tableEnv.registerCatalog(catalogName, hive); tableEnv.useCatalog(catalogName); tableEnv.useDatabase(defaultDatabase); System.out.println(tableEnv.getCurrentCatalog()); String tableName = "icebergTBCloudTrackingTest"; tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); String sql = "select uuid from " + tableName; System.out.println(sql); tableEnv.executeSql(sql).print(); {code} The output of `tableEnv.getCurrentCatalog()` is `s3IcebergCatalog`. But it reports `10:42:44,866 INFO org.apache.iceberg.BaseMetastoreCatalog [] - Table loaded by catalog: default_iceberg.s3a_flink.icebergtbcloudtrackingtest `, and shows `java.lang.IllegalArgumentException: Table identifier not set`. Does anyone know the reason please? -- This message was sent by Atlassian Jira (v8.20.10#820010)