This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c576b76 [Fix] Fixed the problem that the query could not be performed
when pushing utf8 encoding (#311)
c576b76 is described below
commit c576b761a9f0e8f25384e5affeffc65a799125de
Author: wudi <[email protected]>
AuthorDate: Mon Apr 21 10:13:22 2025 +0800
[Fix] Fixed the problem that the query could not be performed when pushing
utf8 encoding (#311)
---
.../doris/spark/client/DorisFrontendClient.java | 23 +++++++++------
.../apache/doris/spark/sql/DorisReaderITCase.scala | 33 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 9 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index 3298be0..cbc1352 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -17,14 +17,6 @@
package org.apache.doris.spark.client;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.doris.spark.client.entity.Backend;
import org.apache.doris.spark.client.entity.Frontend;
import org.apache.doris.spark.config.DorisConfig;
@@ -36,6 +28,15 @@ import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.util.HttpUtils;
import org.apache.doris.spark.util.URLs;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -312,7 +314,10 @@ public class DorisFrontendClient implements Serializable {
HttpPost httpPost = new
HttpPost(URLs.queryPlan(frontend.getHost(), frontend.getHttpPort(), database,
table, isHttpsEnabled));
HttpUtils.setAuth(httpPost, username, password);
String body = MAPPER.writeValueAsString(ImmutableMap.of("sql",
sql));
- httpPost.setEntity(new StringEntity(body));
+ StringEntity stringEntity = new StringEntity(body,
StandardCharsets.UTF_8);
+ stringEntity.setContentEncoding("UTF-8");
+ stringEntity.setContentType("application/json");
+ httpPost.setEntity(stringEntity);
HttpResponse response = httpClient.execute(httpPost);
if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
throw new DorisException("query plan request failed, code:
" + response.getStatusLine().getStatusCode()
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index b5d311f..893cdf9 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -53,6 +53,7 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int)
extends AbstractCo
val DATABASE = "test_doris_read"
val TABLE_READ = "tbl_read"
val TABLE_READ_TBL = "tbl_read_tbl"
+ val TABLE_READ_UTF8_TBL = "tbl_read_utf8_tbl"
val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types"
val TABLE_READ_TBL_BIT_MAP = "tbl_read_tbl_bitmap"
@@ -428,6 +429,38 @@ class DorisReaderITCase(readMode: String, flightSqlPort:
Int) extends AbstractCo
session.stop()
}
+ @Test
+ def testReadPushDownUTF8(): Unit = {
+ initializeTable(TABLE_READ_UTF8_TBL, DataModel.UNIQUE)
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("insert into %s.%s values ('中文',60)", DATABASE,
TABLE_READ_UTF8_TBL))
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_UTF8_TBL}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+
+ val utf8Filter = session.sql(
+ """
+ |select name,age from test_source where name = '中文'
+ |""".stripMargin).collect()
+
+ assert("List([中文,60])".equals(utf8Filter.toList.toString()))
+ session.stop()
+ }
+
@Test
def buildCaseWhenTest(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]