Yes, it works with all the formats supported by the kafka connector. On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra <[email protected]> wrote:
> Hi Jark > Thanks very much will this work with Avro > > On Tue, Nov 17, 2020 at 07:44 Jark Wu <[email protected]> wrote: > >> Hi Slim, >> >> In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and >> set the class name to 'sink.partitioner' option. >> >> In 1.12, you can re-partition the data by specifying the key field (Kafka >> producer will partition data by the message key by default). You can do >> this by adding some additional options in 1.12. >> >> CREATE TABLE output_kafkaTable ( >> user_id BIGINT, >> item_id BIGINT, >> category_id BIGINT, >> behavior STRING, >> ts TIMESTAMP(3) >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'user_behavior_partition_by_iid', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'key.fields' = 'item_id', -- specify which columns will be written to >> message key >> 'key.format' = 'raw', >> 'value.format' = 'json' >> ); >> >> >> Best, >> Jark >> >> >> >> On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <[email protected]> >> wrote: >> >>> Hi, >>> >>> I'm pulling in some Flink SQL experts (in CC) to help you with this one >>> :) >>> >>> Cheers, >>> Gordon >>> >>> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[email protected]> >>> wrote: >>> >>>> Hi, >>>> I am trying to author a SQL job that does repartitioning a Kafka SQL >>>> table into another Kafka SQL table. >>>> as example input/output tables have exactly the same SQL schema (see >>>> below) and data the only difference is that the new kafka stream need to be >>>> repartition using a simple project like item_id (input stream is >>>> partitioned by user_id) >>>> is there a way to do this via SQL only ? without using >>>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner >>>> >>>> In other words how can we express the stream key (keyedBy) via the SQL >>>> layer ? >>>> >>>> For instance in Hive they expose a system column called __key or >>>> __partition that can be used to do this via SQL layer (see >>>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions >>>> ) >>>> >>>> CREATE TABLE input_kafkaTable ( >>>> user_id BIGINT, >>>> item_id BIGINT, >>>> category_id BIGINT, >>>> behavior STRING, >>>> ts TIMESTAMP(3) >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'user_behavior_partition_by_uid', >>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>> ) >>>> >>>> CREATE TABLE output_kafkaTable ( >>>> user_id BIGINT, >>>> item_id BIGINT, >>>> category_id BIGINT, >>>> behavior STRING, >>>> ts TIMESTAMP(3) >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'user_behavior_partition_by_iid', >>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>> ) >>>> >>>> >>>> >>>> -- >>>> >>>> B-Slim >>>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ >>>> >>> -- > > B-Slim > _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ >
