Yes, it works with all the formats supported by the kafka connector.

On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra <[email protected]>
wrote:

> Hi Jark
> Thanks very much will this work with Avro
>
> On Tue, Nov 17, 2020 at 07:44 Jark Wu <[email protected]> wrote:
>
>> Hi Slim,
>>
>> In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and
>> set the class name to 'sink.partitioner' option.
>>
>> In 1.12, you can re-partition the data by specifying the key field (Kafka
>> producer will partition data by the message key by default). You can do
>> this by adding some additional options in 1.12.
>>
>> CREATE TABLE output_kafkaTable (
>>  user_id BIGINT,
>>  item_id BIGINT,
>>  category_id BIGINT,
>>  behavior STRING,
>>  ts TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'user_behavior_partition_by_iid',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'key.fields' = 'item_id',  -- specify which columns will be written to
>> message key
>>  'key.format' = 'raw',
>>  'value.format' = 'json'
>> );
>>
>>
>> Best,
>> Jark
>>
>>
>>
>> On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm pulling in some Flink SQL experts (in CC) to help you with this one
>>> :)
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>> I am trying to author a SQL job that does repartitioning a Kafka SQL
>>>> table into another Kafka SQL table.
>>>> as example input/output tables have exactly the same SQL schema (see
>>>> below) and data the only difference is that the new kafka stream need to be
>>>> repartition using a simple project like item_id (input stream is
>>>> partitioned by user_id)
>>>> is there a way to do this via SQL only ? without using
>>>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
>>>>
>>>> In other words how can we express the stream key (keyedBy) via the SQL
>>>> layer ?
>>>>
>>>> For instance in Hive they expose a system column called  __key or
>>>> __partition that can be used to do this via SQL layer  (see
>>>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions
>>>> )
>>>>
>>>> CREATE TABLE input_kafkaTable (
>>>>  user_id BIGINT,
>>>>  item_id BIGINT,
>>>>  category_id BIGINT,
>>>>  behavior STRING,
>>>>  ts TIMESTAMP(3)
>>>> ) WITH (
>>>>  'connector' = 'kafka',
>>>>  'topic' = 'user_behavior_partition_by_uid',
>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>> )
>>>>
>>>> CREATE TABLE output_kafkaTable (
>>>>  user_id BIGINT,
>>>>  item_id BIGINT,
>>>>  category_id BIGINT,
>>>>  behavior STRING,
>>>>  ts TIMESTAMP(3)
>>>> ) WITH (
>>>>  'connector' = 'kafka',
>>>>  'topic' = 'user_behavior_partition_by_iid',
>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>> )
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> B-Slim
>>>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
>>>>
>>> --
>
> B-Slim
> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
>

Reply via email to