Have you tried "t2" instead of "test.t2"? There is a possibility that catalog 
name isn't part of the table name in the table API.

Thanks,
Xuefu


------------------------------------------------------------------
Sender:Flavio Pompermaier <pomperma...@okkam.it>
Sent at:2018 Oct 22 (Mon) 23:06
Recipient:user <user@flink.apache.org>
Subject:Java Table API and external catalog bug?

Hi to all,
I've tried to register an external catalog and use it with the Table API in 
Flink 1.6.1.
The following (Java) test job cannot write to a sink using insertInto because 
Flink cannot find the table by id (test.t2). Am I doing something wrong or is 
this a bug?

This is my Java test class:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.InMemoryExternalCatalog;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;

public class CatalogExperiment {
  public static void main(String[] args) throws Exception {
    // create an external catalog
    final String outPath = "file:/tmp/file2.txt";
    InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
    FileSystem connDescIn = new FileSystem().path("file:/tmp/file-test.txt");
    FileSystem connDescOut = new FileSystem().path(outPath);
    FormatDescriptor csvDesc = new Csv()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string")//
        .fieldDelimiter("\t");
    Schema schemaDesc = new Schema()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string");
    ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSource();
    ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSink();
    catalog.createTable("t1", t1, true);
    catalog.createTable("t2", t2, true);

    final  ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv = 
TableEnvironment.getTableEnvironment(env);
    btEnv.registerExternalCatalog("test", catalog);
    // this does not work ---------------------------------------
    btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was 
registered under the name test.t2
    // this works ---------------------------------------
    btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", 1, 
WriteMode.OVERWRITE));
    env.execute();
  }
}


Best,
Flavio

Reply via email to