lvyanquan commented on code in PR #3723: URL: https://github.com/apache/flink-cdc/pull/3723#discussion_r1926489573
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java: ########## @@ -151,4 +179,23 @@ private void validateRequiredOptions(Configuration configuration) { .collect(Collectors.joining("\n")))); } } + + private void validateShardingSeparator(String separator) { + if (!separator.equals(separator.toLowerCase())) { + throw new ValidationException( + String.format( + "%s is malformed, elasticsearch index only support lowercase.", + SHARDING_SUFFIX_SEPARATOR.key())); + } + + String illegalChars = "\\/*?\"<>| ,#"; Review Comment: It's better to use static variable. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one或多个 + * contributor license agreements. See the NOTICE file distributed with + * this work for additional信息 regarding copyright ownership. Review Comment: Added by accident? ########## docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md: ########## @@ -164,6 +164,13 @@ Pipeline Connector Options <td>Long</td> <td>单个记录的最大大小(以byte为单位)。</td> </tr> + <tr> + <td>sharding.suffix.key</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Long</td> + <td>每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用‘:’分割。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。</td> + </tr> Review Comment: Newly added option was not included. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java: ########## @@ -70,8 +71,18 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul /** ZoneId from pipeline config to support timestamp with local time zone. */ private final ZoneId pipelineZoneId; + private final Map<TableId, String> shardingKey; + private final String shardingSeparator; + public ElasticsearchEventSerializer(ZoneId zoneId) { + this(zoneId, Collections.emptyMap(), "_"); Review Comment: SHARDING_SUFFIX_SEPARATOR.defaultValue() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org