This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d64d419 [bugfix]Fix a mismatch between the type of jdbc query result 
and the datatype (#514)
8d64d419 is described below

commit 8d64d419a45e5fd438b8ec38adf2ccc1b130bfb7
Author: Zhumengze <112678590+zhumen...@users.noreply.github.com>
AuthorDate: Wed Nov 20 10:26:09 2024 +0800

    [bugfix]Fix a mismatch between the type of jdbc query result and the 
datatype (#514)
    
    The value type obtained by rs.getObject(index + 1) is inconsistent with the 
field type obtained by flink Context, resulting in a conversion failure in 
encapsulating flink RowData. For example, the java type corresponding to 
TINYINT is Java.lang. Byte, while the java type obtained by rs.getObject(index 
+ 1) is Java.lang. Integer, which has a type conversion problem. SMALLINT had 
the same problem.
---
 .../java/org/apache/doris/flink/lookup/Worker.java |   7 +-
 .../doris/flink/lookup/DorisLookupTableITCase.java | 141 +++++++++++++++++++++
 2 files changed, 146 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
index e4e08f7e..59cfd85d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.lookup;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -200,8 +201,10 @@ public class Worker implements Runnable {
                     try (ResultSet rs = ps.executeQuery()) {
                         while (rs.next()) {
                             Record record = new Record(schema);
-                            for (int index = 0; index < 
schema.getFieldTypes().length; index++) {
-                                record.setObject(index, rs.getObject(index + 
1));
+                            DataType[] fieldTypes = schema.getFieldTypes();
+                            for (int index = 0; index < fieldTypes.length; 
index++) {
+                                Class<?> conversionClass = 
fieldTypes[index].getConversionClass();
+                                record.setObject(index, rs.getObject(index + 
1, conversionClass));
                             }
                             List<Record> records =
                                     resultRecordMap.computeIfAbsent(
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
new file mode 100644
index 00000000..6d569bcf
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.table.DorisConfigOptions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class DorisLookupTableITCase extends AbstractITCaseService {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisLookupTableITCase.class);
+    private static final String DATABASE = "test_lookup";
+    private static final String TABLE_READ_TBL = "tbl_read_tbl";
+
+    @Test
+    public void testLookupTable() throws Exception {
+        initializeTable();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3, 4);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT())
+                        .columnByExpression("proctime", "PROCTIME()")
+                        .build();
+        Table table = tEnv.fromDataStream(sourceStream, schema);
+        tEnv.createTemporaryView("source", table);
+
+        String lookupDDL =
+                String.format(
+                        "CREATE TABLE `doris_lookup`("
+                                + "  `id` INTEGER,"
+                                + "  `tinyintColumn` TINYINT,"
+                                + "  `smallintColumn` SMALLINT,"
+                                + "  `bigintColumn` BIGINT,"
+                                + "  PRIMARY KEY (`id`) NOT ENFORCED"
+                                + ")  WITH ("
+                                + "'connector' = '"
+                                + DorisConfigOptions.IDENTIFIER
+                                + "',"
+                                + "'fenodes' = '%s',"
+                                + "'jdbc-url' = '%s',"
+                                + "'table.identifier' = '%s',"
+                                + "'username' = '%s',"
+                                + "'password' = '%s',"
+                                + "'lookup.cache.max-rows' = '100'"
+                                + ")",
+                        getFenodes(),
+                        getDorisQueryUrl(),
+                        DATABASE + "." + TABLE_READ_TBL,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(lookupDDL);
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "select source.f0,"
+                                + "tinyintColumn,"
+                                + "smallintColumn,"
+                                + "bigintColumn"
+                                + " from `source`"
+                                + " inner join `doris_lookup` FOR SYSTEM_TIME 
AS OF source.proctime on source.f0 = doris_lookup.id");
+
+        List<String> actual = new ArrayList<>();
+        try (CloseableIterator<Row> iterator = tableResult.collect()) {
+            while (iterator.hasNext()) {
+                actual.add(iterator.next().toString());
+            }
+        }
+
+        String[] expected =
+                new String[] {
+                    "+I[1, 97, 27479, 8670353564751764000]",
+                    "+I[2, 79, 17119, -4381380624467725000]",
+                    "+I[3, -106, -14878, 1466614815449373200]"
+                };
+        assertEqualsInAnyOrder(Arrays.asList(expected), 
Arrays.asList(actual.toArray()));
+    }
+
+    private void initializeTable() {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format(
+                        "DROP TABLE IF EXISTS %s.%s",
+                        DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`id` int(11),\n"
+                                + "`tinyintColumn` tinyint(4),\n"
+                                + "`smallintColumn` smallint(6),\n"
+                                + "`bigintColumn` bigint(20),\n"
+                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 10\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+                String.format(
+                        "insert into %s.%s  values 
(1,97,27479,8670353564751764000)",
+                        DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+                String.format(
+                        "insert into %s.%s  values 
(2,79,17119,-4381380624467725000)",
+                        DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+                String.format(
+                        "insert into %s.%s  values 
(3,-106,-14878,1466614815449373200)",
+                        DATABASE, DorisLookupTableITCase.TABLE_READ_TBL));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to