Hi Jark, I have tried to use CREATE table DDL First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created. But when I query with select * from tableName. There's error:
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Perhaps i need some jar to the lib directory. But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly Thanks, Lei From: Jark Wu Date: 2020-03-11 11:13 To: wangl...@geekplus.com.cn CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> wrote: Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> wrote: I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei