Hi, Lei ~
You may need to implement the abstract class FlinkKafkaPartitioner and then use 
the full class name as the param value of the option ‘sink.partitioner’. 
FlinkFixedPartitioner[1] is a good example there.

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java

Best,
Danny Chan
在 2020年8月18日 +0800 PM8:18,wangl...@geekplus.com <wangl...@geekplus.com>,写道:
>
>
> CREATE TABLE kafka_sink_table(
>  warehouse_id INT,
>  pack_task_order_id BIGINT,
>  out_order_code STRING,
>  pick_order_id BIGINT,
>  end_time BIGINT
> WITH (
>  'connector'='kafka',
>  'topic'='ods_wms_pack_task_order',
>  'properties.bootstrap.servers'='172.19.78.32:9092',
>  'format'='json'
> );
>
>
> INSERT INTO  kafka_sink_table SELECT  .......
>
> As describe here: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html
> I want to do partition according to warehouse_id.
>
> How should i write my customer partitioner? Is there any example?
>
> Thanks,
> Lei
>
> wangl...@geekplus.com

Reply via email to