I've tried with t2, test.t2 and test.test.t2. On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuef...@alibaba-inc.com> wrote:
> Have you tried "t2" instead of "test.t2"? There is a possibility that > catalog name isn't part of the table name in the table API. > > Thanks, > Xuefu > > ------------------------------------------------------------------ > Sender:Flavio Pompermaier <pomperma...@okkam.it> > Sent at:2018 Oct 22 (Mon) 23:06 > Recipient:user <user@flink.apache.org> > Subject:Java Table API and external catalog bug? > > Hi to all, > I've tried to register an external catalog and use it with the Table API > in Flink 1.6.1. > The following (Java) test job cannot write to a sink using insertInto > because Flink cannot find the table by id (test.t2). Am I doing something > wrong or is this a bug? > > This is my Java test class: > > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.table.api.TableEnvironment; > import org.apache.flink.table.api.java.BatchTableEnvironment; > import org.apache.flink.table.catalog.ExternalCatalogTable; > import org.apache.flink.table.catalog.InMemoryExternalCatalog; > import org.apache.flink.table.descriptors.Csv; > import org.apache.flink.table.descriptors.FileSystem; > import org.apache.flink.table.descriptors.FormatDescriptor; > import org.apache.flink.table.descriptors.Schema; > import org.apache.flink.table.sinks.CsvTableSink; > > public class CatalogExperiment { > public static void main(String[] args) throws Exception { > // create an external catalog > final String outPath = "file:/tmp/file2.txt"; > InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test"); > FileSystem connDescIn = new > FileSystem().path("file:/tmp/file-test.txt"); > FileSystem connDescOut = new FileSystem().path(outPath); > FormatDescriptor csvDesc = new Csv()// > .field("a", "string")// > .field("b", "string")// > .field("c", "string")// > .fieldDelimiter("\t"); > Schema schemaDesc = new Schema()// > .field("a", "string")// > .field("b", "string")// > .field("c", "string"); > ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)// > .withFormat(csvDesc)// > .withSchema(schemaDesc)// > .asTableSource(); > ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)// > .withFormat(csvDesc)// > .withSchema(schemaDesc)// > .asTableSink(); > catalog.createTable("t1", t1, true); > catalog.createTable("t2", t2, true); > > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > final BatchTableEnvironment btEnv = > TableEnvironment.getTableEnvironment(env); > btEnv.registerExternalCatalog("test", catalog); > // this does not work --------------------------------------- > btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was > registered under the name test.t2 > // this works --------------------------------------- > btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", > 1, WriteMode.OVERWRITE)); > env.execute(); > } > } > > > Best, > Flavio > >