Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink.
Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > Thanks, works now. > > Seems it is because i added the > schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code > STRING, status INT)" > > under format label. > > *From:* Arvid Heise <ar...@ververica.com> > *Date:* 2020-03-10 20:51 > *To:* wangl...@geekplus.com.cn > *CC:* user <user@flink.apache.org> > *Subject:* Re: Dose flink-1.10 sql-client support kafka sink? > Hi Lei, > > yes Kafka as a sink is supported albeit only for appends (no > deletions/updates yet) [1]. > > An example is a bit hidden in the documentation [2]: > > tables: > - name: MyTableSink > type: sink-table > update-mode: append > connector: > property-version: 1 > type: kafka > version: "0.11" > topic: OutputTopic > properties: > zookeeper.connect: localhost:2181 > bootstrap.servers: localhost:9092 > group.id: testGroup > format: > property-version: 1 > type: json > derive-schema: true > schema: > - name: rideId > data-type: BIGINT > - name: lon > data-type: FLOAT > - name: lat > data-type: FLOAT > - name: rideTime > data-type: TIMESTAMP(3) > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries > > On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> >> I have configured source table successfully using the following >> configuration: >> >> - name: out_order >> type: source >> update-mode: append >> schema: >> - name: out_order_code >> type: STRING >> - name: input_date >> type: BIGINT >> - name: owner_code >> type: STRING >> connector: >> property-version: 1 >> type: kafka >> version: universal >> topic: out_order >> startup-mode: latest-offset >> properties: >> - key: zookeeper.connect >> value: 172.19.78.32:2181 >> - key: bootstrap.servers >> value: 172.19.78.32:9092 >> - key: group.id >> property-version: 1 >> type: json >> schema: "ROW(out_order_code STRING,owner_code STRING,input_date >> BIGINT)" >> >> How can i configure a sink table? I haven't found any useful docs for >> this. >> >> Thanks, >> Lei >> >