Hi Lei,

Yes. If you are creating a Kafka table, then the kafka connector jar and
some format jars are required.

That's weird. If DDL is failed, the yaml way should fail in the same
exception, unless some connector properties value is not the same.

Could you share the detailed exception stack?

Best,
Jark

On Wed, 11 Mar 2020 at 14:51, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Jark,
>
> I  have tried to use CREATE table DDL
> First  ./bin/sql-client.sh embedded. Then create a table from kafka topic
> and it tell me table has been created.
> But when I query with select * from tableName. There's error:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Perhaps i need some jar to the lib directory.
> But If i write the table configuration in the sql-client-defaults.yaml
> file,i can select the result correctly
>
> Thanks,
> Lei
>
> ------------------------------
>
>
> *From:* Jark Wu <imj...@gmail.com>
> *Date:* 2020-03-11 11:13
> *To:* wangl...@geekplus.com.cn
> *CC:* Arvid Heise <ar...@ververica.com>; user <user@flink.apache.org>
> *Subject:* Re: Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> CREATE TABLE DDL [1][2] is the recommended way to register a table since
> 1.9. And the yaml way might be deprecated in the future.
> By using DDL, a registered table can both be used as source and sink.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
> On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Thanks, works now.
>>
>> Seems it is because i added the
>>    schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
>> STRING, status INT)"
>>
>> under format label.
>>
>> *From:* Arvid Heise <ar...@ververica.com>
>> *Date:* 2020-03-10 20:51
>> *To:* wangl...@geekplus.com.cn
>> *CC:* user <user@flink.apache.org>
>> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
>> Hi Lei,
>>
>> yes Kafka as a sink is supported albeit only for appends (no
>> deletions/updates yet) [1].
>>
>> An example is a bit hidden in the documentation [2]:
>>
>> tables:
>>   - name: MyTableSink
>>     type: sink-table
>>     update-mode: append
>>     connector:
>>       property-version: 1
>>       type: kafka
>>       version: "0.11"
>>       topic: OutputTopic
>>       properties:
>>         zookeeper.connect: localhost:2181
>>         bootstrap.servers: localhost:9092
>>         group.id: testGroup
>>     format:
>>       property-version: 1
>>       type: json
>>       derive-schema: true
>>     schema:
>>       - name: rideId
>>         data-type: BIGINT
>>       - name: lon
>>         data-type: FLOAT
>>       - name: lat
>>         data-type: FLOAT
>>       - name: rideTime
>>         data-type: TIMESTAMP(3)
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>>
>> On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>>>
>>> I have configured  source table successfully using the following
>>> configuration:
>>>
>>> - name: out_order
>>>     type: source
>>>     update-mode: append
>>>     schema:
>>>     - name: out_order_code
>>>       type: STRING
>>>     - name: input_date
>>>       type: BIGINT
>>>     - name: owner_code
>>>       type: STRING
>>>     connector:
>>>       property-version: 1
>>>       type: kafka
>>>       version: universal
>>>       topic: out_order
>>>       startup-mode: latest-offset
>>>       properties:
>>>       - key: zookeeper.connect
>>>         value: 172.19.78.32:2181
>>>       - key: bootstrap.servers
>>>         value: 172.19.78.32:9092
>>>       - key: group.id
>>>       property-version: 1
>>>       type: json
>>>       schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>>> BIGINT)"
>>>
>>> How can i configure a sink table? I haven't found any useful docs for
>>> this.
>>>
>>> Thanks,
>>> Lei
>>>
>>

Reply via email to