IIRC, that was recently fixed.
Might come out with 1.6.2 / 1.7.0.

Cheers, Fabian


Flavio Pompermaier <pomperma...@okkam.it> schrieb am Do., 25. Okt. 2018,
14:09:

> Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;)
> On Thu, Oct 25, 2018 at 2:00 PM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Flavio,
>>
>> the external catalog support is not feature complete yet. I think you can
>> only specify the catalog when reading from a table but `insertInto` does
>> not consider the catalog name.
>>
>> Regards,
>> TImo
>>
>>
>> Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
>>
>> Any other help here? is this a bug or something wrong in my code?
>>
>> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> I've tried with t2, test.t2 and test.test.t2.
>>>
>>> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuef...@alibaba-inc.com>
>>> wrote:
>>>
>>>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>>>> catalog name isn't part of the table name in the table API.
>>>>
>>>> Thanks,
>>>> Xuefu
>>>>
>>>> ------------------------------------------------------------------
>>>> Sender:Flavio Pompermaier <pomperma...@okkam.it>
>>>> Sent at:2018 Oct 22 (Mon) 23:06
>>>> Recipient:user <user@flink.apache.org>
>>>> Subject:Java Table API and external catalog bug?
>>>>
>>>> Hi to all,
>>>> I've tried to register an external catalog and use it with the Table
>>>> API in Flink 1.6.1.
>>>> The following (Java) test job cannot write to a sink using insertInto
>>>> because Flink cannot find the table by id (test.t2). Am I doing something
>>>> wrong or is this a bug?
>>>>
>>>> This is my Java test class:
>>>>
>>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>>>> import org.apache.flink.table.api.TableEnvironment;
>>>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>>>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>>>> import org.apache.flink.table.descriptors.Csv;
>>>> import org.apache.flink.table.descriptors.FileSystem;
>>>> import org.apache.flink.table.descriptors.FormatDescriptor;
>>>> import org.apache.flink.table.descriptors.Schema;
>>>> import org.apache.flink.table.sinks.CsvTableSink;
>>>>
>>>> public class CatalogExperiment {
>>>>   public static void main(String[] args) throws Exception {
>>>>     // create an external catalog
>>>>     final String outPath = "file:/tmp/file2.txt";
>>>>     InMemoryExternalCatalog catalog = new
>>>> InMemoryExternalCatalog("test");
>>>>     FileSystem connDescIn = new FileSystem().path(
>>>> "file:/tmp/file-test.txt");
>>>>     FileSystem connDescOut = new FileSystem().path(outPath);
>>>>     FormatDescriptor csvDesc = new Csv()//
>>>>         .field("a", "string")//
>>>>         .field("b", "string")//
>>>>         .field("c", "string")//
>>>>         .fieldDelimiter("\t");
>>>>     Schema schemaDesc = new Schema()//
>>>>         .field("a", "string")//
>>>>         .field("b", "string")//
>>>>         .field("c", "string");
>>>>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>>>>         .withFormat(csvDesc)//
>>>>         .withSchema(schemaDesc)//
>>>>         .asTableSource();
>>>>     ExternalCatalogTable t2 =
>>>> ExternalCatalogTable.builder(connDescOut)//
>>>>         .withFormat(csvDesc)//
>>>>         .withSchema(schemaDesc)//
>>>>         .asTableSink();
>>>>     catalog.createTable("t1", t1, true);
>>>>     catalog.createTable("t2", t2, true);
>>>>
>>>>     final  ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>     final BatchTableEnvironment btEnv =
>>>> TableEnvironment.getTableEnvironment(env);
>>>>     btEnv.registerExternalCatalog("test", catalog);
>>>>     // this does not work ---------------------------------------
>>>>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table
>>>> was registered under the name test.t2
>>>>     // this works ---------------------------------------
>>>>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath,
>>>> "\t", 1, WriteMode.OVERWRITE));
>>>>     env.execute();
>>>>   }
>>>> }
>>>>
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>
>>
>

Reply via email to