Hi, Lei ~ You may need to implement the abstract class FlinkKafkaPartitioner and then use the full class name as the param value of the option ‘sink.partitioner’. FlinkFixedPartitioner[1] is a good example there.
[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java Best, Danny Chan 在 2020年8月18日 +0800 PM8:18,wangl...@geekplus.com <wangl...@geekplus.com>,写道: > > > CREATE TABLE kafka_sink_table( > warehouse_id INT, > pack_task_order_id BIGINT, > out_order_code STRING, > pick_order_id BIGINT, > end_time BIGINT > WITH ( > 'connector'='kafka', > 'topic'='ods_wms_pack_task_order', > 'properties.bootstrap.servers'='172.19.78.32:9092', > 'format'='json' > ); > > > INSERT INTO kafka_sink_table SELECT ....... > > As describe here: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html > I want to do partition according to warehouse_id. > > How should i write my customer partitioner? Is there any example? > > Thanks, > Lei > > wangl...@geekplus.com