Hi Simon, First of all for more thorough discussion you might want to have a look at this thread: https://lists.apache.org/thread.html/b450df1a7bf10187301820e529cbc223ce63f233c1af0f0c7415e62b@%3Cdev.flink.apache.org%3E
TL;DR; All objects registered with registerTable/registerTableSource are temporary objects that do not have serializable form and therefore can only be stored in an in-memory catalog. The useCatalog/useDatabase are experimental APIs in the upcoming 1.9 release. If you want to be sure that tables are stored in a given catalog you can either register it directly via tEnv.getCatalog().createTable() or you can try using SQL DDL. Best, Dawid On 12/08/2019 09:37, Simon Su wrote: > Hi All > I want to use a custom catalog by setting the name “ca1” and > create a database under this catalog. When I submit the > SQL, and it raises the error like : > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 1, column 98 to line 1, column 116: Object 'orderstream' not > found within 'ca1.db1' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) > at > org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) > at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) > at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) > Caused by: org.apache.calcite.runtime.CalciteContextException: From > line 1, column 98 to line 1, column 116: Object 'orderstream' not > found within 'ca1.db1' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) > at > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) > at > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) > ... 7 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: > Object 'orderstream' not found within 'ca1.db1' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 26 more > > It seems that Calcite cannot find the source object as expected, After > I debug the code I found that when using tableEnv.registerTableSource > or registerTableSink, It will use a build-in catalog with a hard-code > catalog name ( default-catalog ) and database name ( default_database > ) while tableEnv.registerCatalog here cannot change this behaviros, So > is this a reasonable behaviors ? If I don’t want to use default > build-in catalog and database, is there any other ways to do this ? > > GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); > tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change > build-in catalog !! tableEnv.useCatalog(catalog.getName()); > catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), > "comment"), true); tableEnv.useDatabase("db1"); tableEnv.connect(sourceKafka) > .withFormat(csv) .withSchema(schema2) .inAppendMode() > .registerTableSource("orderstream"); tableEnv.connect(sinkKafka) > .withFormat(csv) .withSchema(schema2) .inAppendMode() > .registerTableSink("sinkstream");; String sql = "insert into > ca1.db1.sinkstream " + "select tumble_start(ts, > INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " + > "group by tumble(ts, INTERVAL '5' SECOND), data"; tableEnv.sqlUpdate(sql); > tableEnv.execute("test"); > > Thanks, > SImon >
signature.asc
Description: OpenPGP digital signature