lvyanquan commented on code in PR #3723:
URL: https://github.com/apache/flink-cdc/pull/3723#discussion_r1926489573


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java:
##########
@@ -151,4 +179,23 @@ private void validateRequiredOptions(Configuration 
configuration) {
                                     .collect(Collectors.joining("\n"))));
         }
     }
+
+    private void validateShardingSeparator(String separator) {
+        if (!separator.equals(separator.toLowerCase())) {
+            throw new ValidationException(
+                    String.format(
+                            "%s is malformed, elasticsearch index only support 
lowercase.",
+                            SHARDING_SUFFIX_SEPARATOR.key()));
+        }
+
+        String illegalChars = "\\/*?\"<>| ,#";

Review Comment:
   It's better to use static variable.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one或多个
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional信息 regarding copyright ownership.

Review Comment:
   Added by accident?



##########
docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md:
##########
@@ -164,6 +164,13 @@ Pipeline Connector Options
       <td>Long</td>
       <td>单个记录的最大大小(以byte为单位)。</td>
     </tr>
+    <tr>
+      <td>sharding.suffix.key</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Long</td>
+      
<td>每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用‘:’分割。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。</td>
+    </tr>

Review Comment:
   Newly added option was not included.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java:
##########
@@ -70,8 +71,18 @@ public class ElasticsearchEventSerializer implements 
ElementConverter<Event, Bul
     /** ZoneId from pipeline config to support timestamp with local time zone. 
*/
     private final ZoneId pipelineZoneId;
 
+    private final Map<TableId, String> shardingKey;
+    private final String shardingSeparator;
+
     public ElasticsearchEventSerializer(ZoneId zoneId) {
+        this(zoneId, Collections.emptyMap(), "_");

Review Comment:
   SHARDING_SUFFIX_SEPARATOR.defaultValue()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to