Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog
API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
Flink's CatalogManager, It doens't change the default catalog/database as
you expected. To switch to your newly registered catalog, you could call
tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a
"catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su <barley...@163.com> wrote:

> Hi All
>     I want to use a custom catalog by setting the name “ca1” and create a
> database under this catalog. When I submit the
> SQL, and it raises the error like :
>
>
>     Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed. From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
> 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>
> It seems that Calcite cannot find the source object as expected, After I
> debug the code I found that when using tableEnv.registerTableSource or
> registerTableSink, It will use a build-in catalog with a hard-code catalog
> name ( default-catalog ) and database name ( default_database ) while
> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> reasonable behaviors ? If I don’t want to use default build-in catalog and
> database, is there any other ways to do this ?
>
>
>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> change build-in catalog !!
> tableEnv.useCatalog(catalog.getName());
> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> "comment"), true);
> tableEnv.useDatabase("db1");
>
> tableEnv.connect(sourceKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSource("orderstream");
>
> tableEnv.connect(sinkKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSink("sinkstream");;
>
> String sql = "insert into ca1.db1.sinkstream " +
> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> ca1.db1.orderstream " +
> "group by tumble(ts, INTERVAL '5' SECOND), data";
>
> tableEnv.sqlUpdate(sql);
> tableEnv.execute("test");
>
>
> Thanks,
> SImon
>
>

-- 
Xuefu Zhang

"In Honey We Trust!"

Reply via email to