Great, thanks! On Wed, Nov 18, 2020 at 18:21 Jark Wu <imj...@gmail.com> wrote:
> Yes, it works with all the formats supported by the kafka connector. > > On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra <slim.bougue...@gmail.com> > wrote: > >> Hi Jark >> Thanks very much will this work with Avro >> >> On Tue, Nov 17, 2020 at 07:44 Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Slim, >>> >>> In 1.11, I think you have to implement a custom FlinkKafkaPartitioner >>> and set the class name to 'sink.partitioner' option. >>> >>> In 1.12, you can re-partition the data by specifying the key field >>> (Kafka producer will partition data by the message key by default). You can >>> do this by adding some additional options in 1.12. >>> >>> CREATE TABLE output_kafkaTable ( >>> user_id BIGINT, >>> item_id BIGINT, >>> category_id BIGINT, >>> behavior STRING, >>> ts TIMESTAMP(3) >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'user_behavior_partition_by_iid', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'key.fields' = 'item_id', -- specify which columns will be written to >>> message key >>> 'key.format' = 'raw', >>> 'value.format' = 'json' >>> ); >>> >>> >>> Best, >>> Jark >>> >>> >>> >>> On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <tzuli...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> I'm pulling in some Flink SQL experts (in CC) to help you with this one >>>> :) >>>> >>>> Cheers, >>>> Gordon >>>> >>>> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra < >>>> slim.bougue...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> I am trying to author a SQL job that does repartitioning a Kafka SQL >>>>> table into another Kafka SQL table. >>>>> as example input/output tables have exactly the same SQL schema (see >>>>> below) and data the only difference is that the new kafka stream need to >>>>> be >>>>> repartition using a simple project like item_id (input stream is >>>>> partitioned by user_id) >>>>> is there a way to do this via SQL only ? without using >>>>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner >>>>> >>>>> In other words how can we express the stream key (keyedBy) via the SQL >>>>> layer ? >>>>> >>>>> For instance in Hive they expose a system column called __key or >>>>> __partition that can be used to do this via SQL layer (see >>>>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions >>>>> ) >>>>> >>>>> CREATE TABLE input_kafkaTable ( >>>>> user_id BIGINT, >>>>> item_id BIGINT, >>>>> category_id BIGINT, >>>>> behavior STRING, >>>>> ts TIMESTAMP(3) >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'user_behavior_partition_by_uid', >>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>> ) >>>>> >>>>> CREATE TABLE output_kafkaTable ( >>>>> user_id BIGINT, >>>>> item_id BIGINT, >>>>> category_id BIGINT, >>>>> behavior STRING, >>>>> ts TIMESTAMP(3) >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'user_behavior_partition_by_iid', >>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>> ) >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> B-Slim >>>>> >>>>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ >>>>> >>>> -- >> >> B-Slim >> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ >> > -- B-Slim _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______