This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e0bbbb8510 [INLONG-10568][Sort] Change the starrocks connector UNKNOWN datatype handle method (#10570) e0bbbb8510 is described below commit e0bbbb85107f2c4fb48918df7f5004f76be807e0 Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Sun Jul 7 12:17:18 2024 +0800 [INLONG-10568][Sort] Change the starrocks connector UNKNOWN datatype handle method (#10570) --- .../sink/table/StarRocksDynamicTableSink.java | 1 - .../table/sink/table/StarRocksSinkOP.java | 42 +++ .../sink/table/StarRocksTableRowTransformer.java | 320 +++++++++++++++++++++ 3 files changed, 362 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java index c5df06a3ab..418e99c4df 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.starrocks.table.sink.table; -import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer; import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import org.apache.flink.api.common.typeinfo.TypeInformation; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java new file mode 100644 index 0000000000..d37d7b0209 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.starrocks.table.sink.table; + +import org.apache.flink.types.RowKind; + +/** + * StarRocks sink operator. + * copy from {@link com.starrocks.connector.flink.row.sink.StarRocksSinkOP} + * not modified + */ +public enum StarRocksSinkOP { + + UPSERT, DELETE; + + public static final String COLUMN_KEY = "__op"; + + static StarRocksSinkOP parse(RowKind kind) { + if (RowKind.INSERT.equals(kind) || RowKind.UPDATE_AFTER.equals(kind)) { + return UPSERT; + } + if (RowKind.DELETE.equals(kind) || RowKind.UPDATE_BEFORE.equals(kind)) { + return DELETE; + } + throw new RuntimeException("Unsupported row kind."); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java new file mode 100644 index 0000000000..6b4586a630 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.starrocks.table.sink.table; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.JSONSerializer; +import com.alibaba.fastjson.serializer.ObjectSerializer; +import com.alibaba.fastjson.serializer.SerializeConfig; +import com.alibaba.fastjson.serializer.SerializeWriter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.table.StarRocksDataType; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryArrayData; +import org.apache.flink.table.data.binary.BinaryMapData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Type; +import java.sql.Date; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * copy from {@link com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer} + */ +public class StarRocksTableRowTransformer implements StarRocksIRowTransformer<RowData> { + + private static final long serialVersionUID = 1L; + + private TypeInformation<RowData> rowDataTypeInfo; + private Function<RowData, RowData> valueTransform; + private String[] columnNames; + private DataType[] columnDataTypes; + private Map<String, StarRocksDataType> columns; + private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd"); + + public StarRocksTableRowTransformer(TypeInformation<RowData> rowDataTypeInfo) { + this.rowDataTypeInfo = rowDataTypeInfo; + } + + @Override + public void setStarRocksColumns(Map<String, StarRocksDataType> columns) { + this.columns = columns; + } + + @Override + public void setTableSchema(TableSchema ts) { + this.columnNames = ts.getFieldNames(); + this.columnDataTypes = ts.getFieldDataTypes(); + } + + @Override + public void setRuntimeContext(RuntimeContext runtimeCtx) { + final TypeSerializer<RowData> typeSerializer = + rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig()); + valueTransform = + runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity(); + SerializeConfig.getGlobalInstance().put(BinaryStringData.class, new BinaryStringDataSerializer()); + SerializeConfig.getGlobalInstance().put(DecimalData.class, new DecimalDataSerializer()); + SerializeConfig.getGlobalInstance().put(TimestampData.class, new TimestampDataSerializer()); + } + + @Override + public Object[] transform(RowData record, boolean supportUpsertDelete) { + RowData transformRecord = valueTransform.apply(record); + Object[] values = new Object[columnDataTypes.length + (supportUpsertDelete ? 1 : 0)]; + int idx = 0; + for (DataType dataType : columnDataTypes) { + values[idx] = typeConvertion(dataType.getLogicalType(), transformRecord, idx); + idx++; + } + if (supportUpsertDelete) { + // set `__op` column + values[idx] = StarRocksSinkOP.parse(record.getRowKind()).ordinal(); + } + return values; + } + + private Object typeConvertion(LogicalType type, RowData record, int pos) { + if (record.isNullAt(pos)) { + return null; + } + switch (type.getTypeRoot()) { + case BOOLEAN: + return record.getBoolean(pos) ? 1L : 0L; + case TINYINT: + return record.getByte(pos); + case SMALLINT: + return record.getShort(pos); + case INTEGER: + return record.getInt(pos); + case BIGINT: + return record.getLong(pos); + case FLOAT: + return record.getFloat(pos); + case DOUBLE: + return record.getDouble(pos); + case CHAR: + case VARCHAR: + String sValue = record.getString(pos).toString(); + if (columns == null) { + return sValue; + } + StarRocksDataType starRocksDataType = + columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN); + if (starRocksDataType == StarRocksDataType.UNKNOWN) { + return sValue; + } + if ((starRocksDataType == StarRocksDataType.JSON) + && (sValue.charAt(0) == '{' || sValue.charAt(0) == '[')) { + // The json string need to be converted to a json object, and to the json string + // again via JSON.toJSONString in StarRocksJsonSerializer#serialize. Otherwise, + // the final json string in stream load will not be correct. For example, the received + // string is "{"a": 1, "b": 2}", and if input it to JSON.toJSONString directly, the + // result will be "{\"a\": 1, \"b\": 2}" which will not be recognized as a json in + // StarRocks + return JSON.parse(sValue); + } + return sValue; + case DATE: + return dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos)))); + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + return record.getTimestamp(pos, timestampPrecision).toLocalDateTime().toString(); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + int localZonedTimestampPrecision = ((LocalZonedTimestampType) type).getPrecision(); + return record.getTimestamp(pos, localZonedTimestampPrecision).toLocalDateTime().toString(); + case DECIMAL: // for both largeint and decimal + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + return record.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal(); + case BINARY: + final byte[] bts = record.getBinary(pos); + long value = 0; + for (int i = 0; i < bts.length; i++) { + value += (bts[bts.length - i - 1] & 0xffL) << (8 * i); + } + return value; + case ARRAY: + return convertNestedArray(record.getArray(pos), type); + case MAP: + return convertNestedMap(record.getMap(pos), type); + case ROW: + RowType rType = (RowType) type; + Map<String, Object> m = new HashMap<>(); + RowData row = record.getRow(pos, rType.getFieldCount()); + rType.getFields().parallelStream().forEach( + f -> m.put(f.getName(), typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName())))); + if (columns == null) { + return m; + } + StarRocksDataType rStarRocksDataType = + columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN); + if (rStarRocksDataType == StarRocksDataType.STRING) { + return JSON.toJSONString(m); + } + return m; + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + private List<Object> convertNestedArray(ArrayData arrData, LogicalType type) { + if (arrData instanceof GenericArrayData) { + return Lists.newArrayList(((GenericArrayData) arrData).toObjectArray()); + } + if (arrData instanceof BinaryArrayData) { + LogicalType lt = ((ArrayType) type).getElementType(); + List<Object> data = Lists.newArrayList(((BinaryArrayData) arrData).toObjectArray(lt)); + if (LogicalTypeRoot.ROW.equals(lt.getTypeRoot())) { + RowType rType = (RowType) lt; + // parse nested row data + return data.parallelStream().map(row -> { + Map<String, Object> m = Maps.newHashMap(); + rType.getFields().parallelStream().forEach(f -> m.put(f.getName(), + typeConvertion(f.getType(), (RowData) row, rType.getFieldIndex(f.getName())))); + return JSON.toJSONString(m); + }).collect(Collectors.toList()); + } + if (LogicalTypeRoot.MAP.equals(lt.getTypeRoot())) { + // traversal of the nested map + return data.parallelStream().map(m -> convertNestedMap((MapData) m, lt)).collect(Collectors.toList()); + } + if (LogicalTypeRoot.DATE.equals(lt.getTypeRoot())) { + return data.parallelStream() + .map(date -> dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay((Integer) date)))) + .collect(Collectors.toList()); + } + if (LogicalTypeRoot.ARRAY.equals(lt.getTypeRoot())) { + // traversal of the nested array + return data.parallelStream().map(arr -> convertNestedArray((ArrayData) arr, lt)) + .collect(Collectors.toList()); + } + return data; + } + throw new UnsupportedOperationException(String.format("Unsupported array data: %s", arrData.getClass())); + } + + private Map<Object, Object> convertNestedMap(MapData mapData, LogicalType type) { + if (mapData instanceof GenericMapData) { + HashMap<Object, Object> m = Maps.newHashMap(); + for (Object k : ((GenericArrayData) ((GenericMapData) mapData).keyArray()).toObjectArray()) { + m.put(k, ((GenericMapData) mapData).get(k)); + } + return m; + } + if (mapData instanceof BinaryMapData) { + Map<Object, Object> result = Maps.newHashMap(); + LogicalType valType = ((MapType) type).getValueType(); + Map<?, ?> javaMap = ((BinaryMapData) mapData).toJavaMap(((MapType) type).getKeyType(), valType); + for (Map.Entry<?, ?> en : javaMap.entrySet()) { + if (LogicalTypeRoot.MAP.equals(valType.getTypeRoot())) { + // traversal of the nested map + result.put(en.getKey().toString(), convertNestedMap((MapData) en.getValue(), valType)); + continue; + } + if (LogicalTypeRoot.DATE.equals(valType.getTypeRoot())) { + result.put(en.getKey().toString(), + dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay((Integer) en.getValue())))); + continue; + } + if (LogicalTypeRoot.ARRAY.equals(valType.getTypeRoot())) { + result.put(en.getKey().toString(), convertNestedArray((ArrayData) en.getValue(), valType)); + continue; + } + result.put(en.getKey().toString(), en.getValue()); + } + return result; + } + throw new UnsupportedOperationException(String.format("Unsupported map data: %s", mapData.getClass())); + } + +} +final class BinaryStringDataSerializer implements ObjectSerializer, Serializable { + + private static final long serialVersionUID = 1L; + @Override + public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features) + throws IOException { + SerializeWriter out = serializer.getWriter(); + if (null == object) { + serializer.getWriter().writeNull(); + return; + } + BinaryStringData strData = (BinaryStringData) object; + out.writeString(strData.toString()); + } +} + +final class DecimalDataSerializer implements ObjectSerializer, Serializable { + + private static final long serialVersionUID = 1L; + @Override + public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features) + throws IOException { + SerializeWriter out = serializer.getWriter(); + if (null == object) { + serializer.getWriter().writeNull(); + return; + } + out.writeString(((DecimalData) object).toBigDecimal().toPlainString()); + } +} + +final class TimestampDataSerializer implements ObjectSerializer, Serializable { + + private static final long serialVersionUID = 1L; + @Override + public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features) + throws IOException { + SerializeWriter out = serializer.getWriter(); + if (null == object) { + serializer.getWriter().writeNull(); + return; + } + out.writeString(((TimestampData) object).toLocalDateTime().toString()); + } +}