This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8d64d419 [bugfix]Fix a mismatch between the type of jdbc query result and the datatype (#514) 8d64d419 is described below commit 8d64d419a45e5fd438b8ec38adf2ccc1b130bfb7 Author: Zhumengze <112678590+zhumen...@users.noreply.github.com> AuthorDate: Wed Nov 20 10:26:09 2024 +0800 [bugfix]Fix a mismatch between the type of jdbc query result and the datatype (#514) The value type obtained by rs.getObject(index + 1) is inconsistent with the field type obtained by flink Context, resulting in a conversion failure in encapsulating flink RowData. For example, the java type corresponding to TINYINT is Java.lang. Byte, while the java type obtained by rs.getObject(index + 1) is Java.lang. Integer, which has a type conversion problem. SMALLINT had the same problem. --- .../java/org/apache/doris/flink/lookup/Worker.java | 7 +- .../doris/flink/lookup/DorisLookupTableITCase.java | 141 +++++++++++++++++++++ 2 files changed, 146 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java index e4e08f7e..59cfd85d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.lookup; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.types.DataType; import org.apache.doris.flink.cfg.DorisLookupOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -200,8 +201,10 @@ public class Worker implements Runnable { try (ResultSet rs = ps.executeQuery()) { while (rs.next()) { Record record = new Record(schema); - for (int index = 0; index < schema.getFieldTypes().length; index++) { - record.setObject(index, rs.getObject(index + 1)); + DataType[] fieldTypes = schema.getFieldTypes(); + for (int index = 0; index < fieldTypes.length; index++) { + Class<?> conversionClass = fieldTypes[index].getConversionClass(); + record.setObject(index, rs.getObject(index + 1, conversionClass)); } List<Record> records = resultRecordMap.computeIfAbsent( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java new file mode 100644 index 00000000..6d569bcf --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.lookup; + +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.apache.doris.flink.container.AbstractITCaseService; +import org.apache.doris.flink.container.ContainerUtils; +import org.apache.doris.flink.table.DorisConfigOptions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class DorisLookupTableITCase extends AbstractITCaseService { + + private static final Logger LOG = LoggerFactory.getLogger(DorisLookupTableITCase.class); + private static final String DATABASE = "test_lookup"; + private static final String TABLE_READ_TBL = "tbl_read_tbl"; + + @Test + public void testLookupTable() throws Exception { + initializeTable(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3, 4); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build(); + Table table = tEnv.fromDataStream(sourceStream, schema); + tEnv.createTemporaryView("source", table); + + String lookupDDL = + String.format( + "CREATE TABLE `doris_lookup`(" + + " `id` INTEGER," + + " `tinyintColumn` TINYINT," + + " `smallintColumn` SMALLINT," + + " `bigintColumn` BIGINT," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") WITH (" + + "'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + "'fenodes' = '%s'," + + "'jdbc-url' = '%s'," + + "'table.identifier' = '%s'," + + "'username' = '%s'," + + "'password' = '%s'," + + "'lookup.cache.max-rows' = '100'" + + ")", + getFenodes(), + getDorisQueryUrl(), + DATABASE + "." + TABLE_READ_TBL, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(lookupDDL); + TableResult tableResult = + tEnv.executeSql( + "select source.f0," + + "tinyintColumn," + + "smallintColumn," + + "bigintColumn" + + " from `source`" + + " inner join `doris_lookup` FOR SYSTEM_TIME AS OF source.proctime on source.f0 = doris_lookup.id"); + + List<String> actual = new ArrayList<>(); + try (CloseableIterator<Row> iterator = tableResult.collect()) { + while (iterator.hasNext()) { + actual.add(iterator.next().toString()); + } + } + + String[] expected = + new String[] { + "+I[1, 97, 27479, 8670353564751764000]", + "+I[2, 79, 17119, -4381380624467725000]", + "+I[3, -106, -14878, 1466614815449373200]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual.toArray())); + } + + private void initializeTable() { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format( + "DROP TABLE IF EXISTS %s.%s", + DATABASE, DorisLookupTableITCase.TABLE_READ_TBL), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`id` int(11),\n" + + "`tinyintColumn` tinyint(4),\n" + + "`smallintColumn` smallint(6),\n" + + "`bigintColumn` bigint(20),\n" + + ") DISTRIBUTED BY HASH(`id`) BUCKETS 10\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, DorisLookupTableITCase.TABLE_READ_TBL), + String.format( + "insert into %s.%s values (1,97,27479,8670353564751764000)", + DATABASE, DorisLookupTableITCase.TABLE_READ_TBL), + String.format( + "insert into %s.%s values (2,79,17119,-4381380624467725000)", + DATABASE, DorisLookupTableITCase.TABLE_READ_TBL), + String.format( + "insert into %s.%s values (3,-106,-14878,1466614815449373200)", + DATABASE, DorisLookupTableITCase.TABLE_READ_TBL)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org