Hi All I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the SQL, and it raises the error like :
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) ... 7 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 26 more It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ? GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !! tableEnv.useCatalog(catalog.getName()); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true); tableEnv.useDatabase("db1"); tableEnv.connect(sourceKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSource("orderstream"); tableEnv.connect(sinkKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSink("sinkstream");; String sql = "insert into ca1.db1.sinkstream " + "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " + "group by tumble(ts, INTERVAL '5' SECOND), data"; tableEnv.sqlUpdate(sql); tableEnv.execute("test"); Thanks, SImon