Any other help here? is this a bug or something wrong in my code? On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <[email protected]> wrote:
> I've tried with t2, test.t2 and test.test.t2. > > On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <[email protected]> wrote: > >> Have you tried "t2" instead of "test.t2"? There is a possibility that >> catalog name isn't part of the table name in the table API. >> >> Thanks, >> Xuefu >> >> ------------------------------------------------------------------ >> Sender:Flavio Pompermaier <[email protected]> >> Sent at:2018 Oct 22 (Mon) 23:06 >> Recipient:user <[email protected]> >> Subject:Java Table API and external catalog bug? >> >> Hi to all, >> I've tried to register an external catalog and use it with the Table API >> in Flink 1.6.1. >> The following (Java) test job cannot write to a sink using insertInto >> because Flink cannot find the table by id (test.t2). Am I doing something >> wrong or is this a bug? >> >> This is my Java test class: >> >> import org.apache.flink.api.java.ExecutionEnvironment; >> import org.apache.flink.core.fs.FileSystem.WriteMode; >> import org.apache.flink.table.api.TableEnvironment; >> import org.apache.flink.table.api.java.BatchTableEnvironment; >> import org.apache.flink.table.catalog.ExternalCatalogTable; >> import org.apache.flink.table.catalog.InMemoryExternalCatalog; >> import org.apache.flink.table.descriptors.Csv; >> import org.apache.flink.table.descriptors.FileSystem; >> import org.apache.flink.table.descriptors.FormatDescriptor; >> import org.apache.flink.table.descriptors.Schema; >> import org.apache.flink.table.sinks.CsvTableSink; >> >> public class CatalogExperiment { >> public static void main(String[] args) throws Exception { >> // create an external catalog >> final String outPath = "file:/tmp/file2.txt"; >> InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test"); >> FileSystem connDescIn = new >> FileSystem().path("file:/tmp/file-test.txt"); >> FileSystem connDescOut = new FileSystem().path(outPath); >> FormatDescriptor csvDesc = new Csv()// >> .field("a", "string")// >> .field("b", "string")// >> .field("c", "string")// >> .fieldDelimiter("\t"); >> Schema schemaDesc = new Schema()// >> .field("a", "string")// >> .field("b", "string")// >> .field("c", "string"); >> ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)// >> .withFormat(csvDesc)// >> .withSchema(schemaDesc)// >> .asTableSource(); >> ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)// >> .withFormat(csvDesc)// >> .withSchema(schemaDesc)// >> .asTableSink(); >> catalog.createTable("t1", t1, true); >> catalog.createTable("t2", t2, true); >> >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> final BatchTableEnvironment btEnv = >> TableEnvironment.getTableEnvironment(env); >> btEnv.registerExternalCatalog("test", catalog); >> // this does not work --------------------------------------- >> btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was >> registered under the name test.t2 >> // this works --------------------------------------- >> btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", >> 1, WriteMode.OVERWRITE)); >> env.execute(); >> } >> } >> >> >> Best, >> Flavio >> >>
