Great, thanks!

On Wed, Nov 18, 2020 at 18:21 Jark Wu <imj...@gmail.com> wrote:

> Yes, it works with all the formats supported by the kafka connector.
>
> On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra <slim.bougue...@gmail.com>
> wrote:
>
>> Hi Jark
>> Thanks very much will this work with Avro
>>
>> On Tue, Nov 17, 2020 at 07:44 Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi Slim,
>>>
>>> In 1.11, I think you have to implement a custom FlinkKafkaPartitioner
>>> and set the class name to 'sink.partitioner' option.
>>>
>>> In 1.12, you can re-partition the data by specifying the key field
>>> (Kafka producer will partition data by the message key by default). You can
>>> do this by adding some additional options in 1.12.
>>>
>>> CREATE TABLE output_kafkaTable (
>>>  user_id BIGINT,
>>>  item_id BIGINT,
>>>  category_id BIGINT,
>>>  behavior STRING,
>>>  ts TIMESTAMP(3)
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'user_behavior_partition_by_iid',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>  'key.fields' = 'item_id',  -- specify which columns will be written to
>>> message key
>>>  'key.format' = 'raw',
>>>  'value.format' = 'json'
>>> );
>>>
>>>
>>> Best,
>>> Jark
>>>
>>>
>>>
>>> On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm pulling in some Flink SQL experts (in CC) to help you with this one
>>>> :)
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <
>>>> slim.bougue...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to author a SQL job that does repartitioning a Kafka SQL
>>>>> table into another Kafka SQL table.
>>>>> as example input/output tables have exactly the same SQL schema (see
>>>>> below) and data the only difference is that the new kafka stream need to 
>>>>> be
>>>>> repartition using a simple project like item_id (input stream is
>>>>> partitioned by user_id)
>>>>> is there a way to do this via SQL only ? without using
>>>>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
>>>>>
>>>>> In other words how can we express the stream key (keyedBy) via the SQL
>>>>> layer ?
>>>>>
>>>>> For instance in Hive they expose a system column called  __key or
>>>>> __partition that can be used to do this via SQL layer  (see
>>>>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions
>>>>> )
>>>>>
>>>>> CREATE TABLE input_kafkaTable (
>>>>>  user_id BIGINT,
>>>>>  item_id BIGINT,
>>>>>  category_id BIGINT,
>>>>>  behavior STRING,
>>>>>  ts TIMESTAMP(3)
>>>>> ) WITH (
>>>>>  'connector' = 'kafka',
>>>>>  'topic' = 'user_behavior_partition_by_uid',
>>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>> )
>>>>>
>>>>> CREATE TABLE output_kafkaTable (
>>>>>  user_id BIGINT,
>>>>>  item_id BIGINT,
>>>>>  category_id BIGINT,
>>>>>  behavior STRING,
>>>>>  ts TIMESTAMP(3)
>>>>> ) WITH (
>>>>>  'connector' = 'kafka',
>>>>>  'topic' = 'user_behavior_partition_by_iid',
>>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>> )
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> B-Slim
>>>>>
>>>>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
>>>>>
>>>> --
>>
>> B-Slim
>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
>>
> --

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______

Reply via email to