This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0acfbf2d [fix] Fix garbled of table or column comments contain Chinese
characters(#401) (#403)
0acfbf2d is described below
commit 0acfbf2d1fb76208178bef391f65ede02a1380a5
Author: North Lin <[email protected]>
AuthorDate: Tue Jun 18 10:28:00 2024 +0800
[fix] Fix garbled of table or column comments contain Chinese
characters(#401) (#403)
---
.../flink/sink/schema/SchemaChangeManager.java | 16 ++++--
.../flink/sink/schema/SchemaManagerITCase.java | 57 ++++++++++++++++++++++
.../doris/flink/sink/schema/SchemaManagerTest.java | 6 +++
3 files changed, 76 insertions(+), 3 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index 2aca3c7b..d2bacf26 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -56,11 +56,17 @@ public class SchemaChangeManager implements Serializable {
private static final String SCHEMA_CHANGE_API =
"http://%s/api/query/default_cluster/%s";
private ObjectMapper objectMapper = new ObjectMapper();
private DorisOptions dorisOptions;
+ private String charsetEncoding = "UTF-8";
public SchemaChangeManager(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
}
+ public SchemaChangeManager(DorisOptions dorisOptions, String
charsetEncoding) {
+ this.dorisOptions = dorisOptions;
+ this.charsetEncoding = charsetEncoding;
+ }
+
public boolean createTable(TableSchema table) throws IOException,
IllegalArgumentException {
String createTableDDL = DorisSystem.buildCreateTableDDL(table);
return execute(createTableDDL, table.getDatabase());
@@ -133,7 +139,8 @@ public class SchemaChangeManager implements Serializable {
table);
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpGet.setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
+ httpGet.setEntity(
+ new StringEntity(objectMapper.writeValueAsString(params),
charsetEncoding));
String responseEntity = "";
Map<String, Object> responseMap = handleResponse(httpGet,
responseEntity);
return handleSchemaChange(responseMap, responseEntity);
@@ -173,8 +180,11 @@ public class SchemaChangeManager implements Serializable {
database);
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
- httpPost.setEntity(new
StringEntity(objectMapper.writeValueAsString(param)));
+ httpPost.setHeader(
+ HttpHeaders.CONTENT_TYPE,
+ String.format("application/json;charset=%s", charsetEncoding));
+ httpPost.setEntity(
+ new StringEntity(objectMapper.writeValueAsString(param),
charsetEncoding));
return httpPost;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
index 053cf65c..8d2a9b0d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
@@ -28,8 +28,11 @@ import org.junit.Test;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
public class SchemaManagerITCase extends DorisTestBase {
@@ -82,6 +85,60 @@ public class SchemaManagerITCase extends DorisTestBase {
Assert.assertTrue(exists);
}
+ @Test
+ public void testAddColumnWithChineseComment()
+ throws SQLException, IOException, IllegalArgumentException {
+ String addColumnTbls = "add_column";
+ initDorisSchemaChangeTable(addColumnTbls);
+
+ // add a column by UTF-8 encoding
+ String addColumnName = "col_with_comment1";
+ String chineseComment = "中文注释1";
+ addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName,
chineseComment, true);
+
+ // change charset encoding to US-ASCII would cause garbled of Chinese.
+ schemaChangeManager = new SchemaChangeManager(options, "US-ASCII");
+ addColumnName = "col_with_comment2";
+ chineseComment = "中文注释2";
+ addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName,
chineseComment, false);
+ }
+
+ private void addColumnWithChineseCommentAndAssert(
+ String tableName, String addColumnName, String chineseComment,
boolean assertFlag)
+ throws SQLException, IOException, IllegalArgumentException {
+ FieldSchema field = new FieldSchema(addColumnName, "string",
chineseComment);
+ schemaChangeManager.addColumn(DATABASE, tableName, field);
+ boolean exists = schemaChangeManager.addColumn(DATABASE, tableName,
field);
+ Assert.assertTrue(exists);
+
+ exists = schemaChangeManager.checkColumnExists(DATABASE, tableName,
addColumnName);
+ Assert.assertTrue(exists);
+
+ // check Chinese comment
+ Map<String, String> columnComments = getColumnComments(tableName);
+ if (assertFlag) {
+ Assert.assertEquals(columnComments.get(addColumnName),
chineseComment);
+ } else {
+ Assert.assertNotEquals(columnComments.get(addColumnName),
chineseComment);
+ }
+ }
+
+ private Map<String, String> getColumnComments(String table) throws
SQLException {
+ Map<String, String> columnCommentsMap = new HashMap<>();
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD)) {
+ ResultSet columns = connection.getMetaData().getColumns(null,
DATABASE, table, null);
+
+ while (columns.next()) {
+ String columnName = columns.getString("COLUMN_NAME");
+ String comment = columns.getString("REMARKS");
+ columnCommentsMap.put(columnName, comment);
+ }
+ }
+ return columnCommentsMap;
+ }
+
@Test
public void testDropColumn() throws SQLException, IOException,
IllegalArgumentException {
String dropColumnTbls = "drop_column";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 529cc860..16c901e5 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -132,6 +132,12 @@ public class SchemaManagerTest {
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT
current_timestamp COMMENT 'comment \"\\'sdf\\''",
addColumnDDL);
+
+ field = new FieldSchema("col", "int", "current_timestamp", "中文注释");
+ addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink",
field);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT
current_timestamp COMMENT '中文注释'",
+ addColumnDDL);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]