Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;) On Thu, Oct 25, 2018 at 2:00 PM Timo Walther <twal...@apache.org> wrote:
> Hi Flavio, > > the external catalog support is not feature complete yet. I think you can > only specify the catalog when reading from a table but `insertInto` does > not consider the catalog name. > > Regards, > TImo > > > Am 25.10.18 um 10:04 schrieb Flavio Pompermaier: > > Any other help here? is this a bug or something wrong in my code? > > On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> I've tried with t2, test.t2 and test.test.t2. >> >> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuef...@alibaba-inc.com> wrote: >> >>> Have you tried "t2" instead of "test.t2"? There is a possibility that >>> catalog name isn't part of the table name in the table API. >>> >>> Thanks, >>> Xuefu >>> >>> ------------------------------------------------------------------ >>> Sender:Flavio Pompermaier <pomperma...@okkam.it> >>> Sent at:2018 Oct 22 (Mon) 23:06 >>> Recipient:user <user@flink.apache.org> >>> Subject:Java Table API and external catalog bug? >>> >>> Hi to all, >>> I've tried to register an external catalog and use it with the Table API >>> in Flink 1.6.1. >>> The following (Java) test job cannot write to a sink using insertInto >>> because Flink cannot find the table by id (test.t2). Am I doing something >>> wrong or is this a bug? >>> >>> This is my Java test class: >>> >>> import org.apache.flink.api.java.ExecutionEnvironment; >>> import org.apache.flink.core.fs.FileSystem.WriteMode; >>> import org.apache.flink.table.api.TableEnvironment; >>> import org.apache.flink.table.api.java.BatchTableEnvironment; >>> import org.apache.flink.table.catalog.ExternalCatalogTable; >>> import org.apache.flink.table.catalog.InMemoryExternalCatalog; >>> import org.apache.flink.table.descriptors.Csv; >>> import org.apache.flink.table.descriptors.FileSystem; >>> import org.apache.flink.table.descriptors.FormatDescriptor; >>> import org.apache.flink.table.descriptors.Schema; >>> import org.apache.flink.table.sinks.CsvTableSink; >>> >>> public class CatalogExperiment { >>> public static void main(String[] args) throws Exception { >>> // create an external catalog >>> final String outPath = "file:/tmp/file2.txt"; >>> InMemoryExternalCatalog catalog = new >>> InMemoryExternalCatalog("test"); >>> FileSystem connDescIn = new FileSystem().path( >>> "file:/tmp/file-test.txt"); >>> FileSystem connDescOut = new FileSystem().path(outPath); >>> FormatDescriptor csvDesc = new Csv()// >>> .field("a", "string")// >>> .field("b", "string")// >>> .field("c", "string")// >>> .fieldDelimiter("\t"); >>> Schema schemaDesc = new Schema()// >>> .field("a", "string")// >>> .field("b", "string")// >>> .field("c", "string"); >>> ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)// >>> .withFormat(csvDesc)// >>> .withSchema(schemaDesc)// >>> .asTableSource(); >>> ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)// >>> .withFormat(csvDesc)// >>> .withSchema(schemaDesc)// >>> .asTableSink(); >>> catalog.createTable("t1", t1, true); >>> catalog.createTable("t2", t2, true); >>> >>> final ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> final BatchTableEnvironment btEnv = >>> TableEnvironment.getTableEnvironment(env); >>> btEnv.registerExternalCatalog("test", catalog); >>> // this does not work --------------------------------------- >>> btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table >>> was registered under the name test.t2 >>> // this works --------------------------------------- >>> btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", >>> 1, WriteMode.OVERWRITE)); >>> env.execute(); >>> } >>> } >>> >>> >>> Best, >>> Flavio >>> >>> > >