Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;)
On Thu, Oct 25, 2018 at 2:00 PM Timo Walther <twal...@apache.org> wrote:

> Hi Flavio,
>
> the external catalog support is not feature complete yet. I think you can
> only specify the catalog when reading from a table but `insertInto` does
> not consider the catalog name.
>
> Regards,
> TImo
>
>
> Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
>
> Any other help here? is this a bug or something wrong in my code?
>
> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> I've tried with t2, test.t2 and test.test.t2.
>>
>> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuef...@alibaba-inc.com> wrote:
>>
>>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>>> catalog name isn't part of the table name in the table API.
>>>
>>> Thanks,
>>> Xuefu
>>>
>>> ------------------------------------------------------------------
>>> Sender:Flavio Pompermaier <pomperma...@okkam.it>
>>> Sent at:2018 Oct 22 (Mon) 23:06
>>> Recipient:user <user@flink.apache.org>
>>> Subject:Java Table API and external catalog bug?
>>>
>>> Hi to all,
>>> I've tried to register an external catalog and use it with the Table API
>>> in Flink 1.6.1.
>>> The following (Java) test job cannot write to a sink using insertInto
>>> because Flink cannot find the table by id (test.t2). Am I doing something
>>> wrong or is this a bug?
>>>
>>> This is my Java test class:
>>>
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>>> import org.apache.flink.table.api.TableEnvironment;
>>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>>> import org.apache.flink.table.descriptors.Csv;
>>> import org.apache.flink.table.descriptors.FileSystem;
>>> import org.apache.flink.table.descriptors.FormatDescriptor;
>>> import org.apache.flink.table.descriptors.Schema;
>>> import org.apache.flink.table.sinks.CsvTableSink;
>>>
>>> public class CatalogExperiment {
>>>   public static void main(String[] args) throws Exception {
>>>     // create an external catalog
>>>     final String outPath = "file:/tmp/file2.txt";
>>>     InMemoryExternalCatalog catalog = new
>>> InMemoryExternalCatalog("test");
>>>     FileSystem connDescIn = new FileSystem().path(
>>> "file:/tmp/file-test.txt");
>>>     FileSystem connDescOut = new FileSystem().path(outPath);
>>>     FormatDescriptor csvDesc = new Csv()//
>>>         .field("a", "string")//
>>>         .field("b", "string")//
>>>         .field("c", "string")//
>>>         .fieldDelimiter("\t");
>>>     Schema schemaDesc = new Schema()//
>>>         .field("a", "string")//
>>>         .field("b", "string")//
>>>         .field("c", "string");
>>>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>>>         .withFormat(csvDesc)//
>>>         .withSchema(schemaDesc)//
>>>         .asTableSource();
>>>     ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>>>         .withFormat(csvDesc)//
>>>         .withSchema(schemaDesc)//
>>>         .asTableSink();
>>>     catalog.createTable("t1", t1, true);
>>>     catalog.createTable("t2", t2, true);
>>>
>>>     final  ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>     final BatchTableEnvironment btEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>>     btEnv.registerExternalCatalog("test", catalog);
>>>     // this does not work ---------------------------------------
>>>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table
>>> was registered under the name test.t2
>>>     // this works ---------------------------------------
>>>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
>>> 1, WriteMode.OVERWRITE));
>>>     env.execute();
>>>   }
>>> }
>>>
>>>
>>> Best,
>>> Flavio
>>>
>>>
>
>

Reply via email to