I've tried with t2, test.t2 and test.test.t2.

On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuef...@alibaba-inc.com> wrote:

> Have you tried "t2" instead of "test.t2"? There is a possibility that
> catalog name isn't part of the table name in the table API.
>
> Thanks,
> Xuefu
>
> ------------------------------------------------------------------
> Sender:Flavio Pompermaier <pomperma...@okkam.it>
> Sent at:2018 Oct 22 (Mon) 23:06
> Recipient:user <user@flink.apache.org>
> Subject:Java Table API and external catalog bug?
>
> Hi to all,
> I've tried to register an external catalog and use it with the Table API
> in Flink 1.6.1.
> The following (Java) test job cannot write to a sink using insertInto
> because Flink cannot find the table by id (test.t2). Am I doing something
> wrong or is this a bug?
>
> This is my Java test class:
>
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.java.BatchTableEnvironment;
> import org.apache.flink.table.catalog.ExternalCatalogTable;
> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.FileSystem;
> import org.apache.flink.table.descriptors.FormatDescriptor;
> import org.apache.flink.table.descriptors.Schema;
> import org.apache.flink.table.sinks.CsvTableSink;
>
> public class CatalogExperiment {
>   public static void main(String[] args) throws Exception {
>     // create an external catalog
>     final String outPath = "file:/tmp/file2.txt";
>     InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
>     FileSystem connDescIn = new
> FileSystem().path("file:/tmp/file-test.txt");
>     FileSystem connDescOut = new FileSystem().path(outPath);
>     FormatDescriptor csvDesc = new Csv()//
>         .field("a", "string")//
>         .field("b", "string")//
>         .field("c", "string")//
>         .fieldDelimiter("\t");
>     Schema schemaDesc = new Schema()//
>         .field("a", "string")//
>         .field("b", "string")//
>         .field("c", "string");
>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>         .withFormat(csvDesc)//
>         .withSchema(schemaDesc)//
>         .asTableSource();
>     ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>         .withFormat(csvDesc)//
>         .withSchema(schemaDesc)//
>         .asTableSink();
>     catalog.createTable("t1", t1, true);
>     catalog.createTable("t2", t2, true);
>
>     final  ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     final BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
>     btEnv.registerExternalCatalog("test", catalog);
>     // this does not work ---------------------------------------
>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was
> registered under the name test.t2
>     // this works ---------------------------------------
>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
> 1, WriteMode.OVERWRITE));
>     env.execute();
>   }
> }
>
>
> Best,
> Flavio
>
>

Reply via email to