IIRC, that was recently fixed. Might come out with 1.6.2 / 1.7.0. Cheers, Fabian
Flavio Pompermaier <pomperma...@okkam.it> schrieb am Do., 25. Okt. 2018, 14:09: > Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;) > On Thu, Oct 25, 2018 at 2:00 PM Timo Walther <twal...@apache.org> wrote: > >> Hi Flavio, >> >> the external catalog support is not feature complete yet. I think you can >> only specify the catalog when reading from a table but `insertInto` does >> not consider the catalog name. >> >> Regards, >> TImo >> >> >> Am 25.10.18 um 10:04 schrieb Flavio Pompermaier: >> >> Any other help here? is this a bug or something wrong in my code? >> >> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> I've tried with t2, test.t2 and test.test.t2. >>> >>> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuef...@alibaba-inc.com> >>> wrote: >>> >>>> Have you tried "t2" instead of "test.t2"? There is a possibility that >>>> catalog name isn't part of the table name in the table API. >>>> >>>> Thanks, >>>> Xuefu >>>> >>>> ------------------------------------------------------------------ >>>> Sender:Flavio Pompermaier <pomperma...@okkam.it> >>>> Sent at:2018 Oct 22 (Mon) 23:06 >>>> Recipient:user <user@flink.apache.org> >>>> Subject:Java Table API and external catalog bug? >>>> >>>> Hi to all, >>>> I've tried to register an external catalog and use it with the Table >>>> API in Flink 1.6.1. >>>> The following (Java) test job cannot write to a sink using insertInto >>>> because Flink cannot find the table by id (test.t2). Am I doing something >>>> wrong or is this a bug? >>>> >>>> This is my Java test class: >>>> >>>> import org.apache.flink.api.java.ExecutionEnvironment; >>>> import org.apache.flink.core.fs.FileSystem.WriteMode; >>>> import org.apache.flink.table.api.TableEnvironment; >>>> import org.apache.flink.table.api.java.BatchTableEnvironment; >>>> import org.apache.flink.table.catalog.ExternalCatalogTable; >>>> import org.apache.flink.table.catalog.InMemoryExternalCatalog; >>>> import org.apache.flink.table.descriptors.Csv; >>>> import org.apache.flink.table.descriptors.FileSystem; >>>> import org.apache.flink.table.descriptors.FormatDescriptor; >>>> import org.apache.flink.table.descriptors.Schema; >>>> import org.apache.flink.table.sinks.CsvTableSink; >>>> >>>> public class CatalogExperiment { >>>> public static void main(String[] args) throws Exception { >>>> // create an external catalog >>>> final String outPath = "file:/tmp/file2.txt"; >>>> InMemoryExternalCatalog catalog = new >>>> InMemoryExternalCatalog("test"); >>>> FileSystem connDescIn = new FileSystem().path( >>>> "file:/tmp/file-test.txt"); >>>> FileSystem connDescOut = new FileSystem().path(outPath); >>>> FormatDescriptor csvDesc = new Csv()// >>>> .field("a", "string")// >>>> .field("b", "string")// >>>> .field("c", "string")// >>>> .fieldDelimiter("\t"); >>>> Schema schemaDesc = new Schema()// >>>> .field("a", "string")// >>>> .field("b", "string")// >>>> .field("c", "string"); >>>> ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)// >>>> .withFormat(csvDesc)// >>>> .withSchema(schemaDesc)// >>>> .asTableSource(); >>>> ExternalCatalogTable t2 = >>>> ExternalCatalogTable.builder(connDescOut)// >>>> .withFormat(csvDesc)// >>>> .withSchema(schemaDesc)// >>>> .asTableSink(); >>>> catalog.createTable("t1", t1, true); >>>> catalog.createTable("t2", t2, true); >>>> >>>> final ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> final BatchTableEnvironment btEnv = >>>> TableEnvironment.getTableEnvironment(env); >>>> btEnv.registerExternalCatalog("test", catalog); >>>> // this does not work --------------------------------------- >>>> btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table >>>> was registered under the name test.t2 >>>> // this works --------------------------------------- >>>> btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, >>>> "\t", 1, WriteMode.OVERWRITE)); >>>> env.execute(); >>>> } >>>> } >>>> >>>> >>>> Best, >>>> Flavio >>>> >>>> >> >> >