Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1044082086
########## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java: ########## @@ -0,0 +1,681 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table.converter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import com.mongodb.internal.HexUtils; +import org.bson.BsonBinary; +import org.bson.BsonBinarySubType; +import org.bson.BsonDocument; +import org.bson.BsonRegularExpression; +import org.bson.BsonUndefined; +import org.bson.BsonValue; +import org.bson.codecs.BsonArrayCodec; +import org.bson.codecs.EncoderContext; +import org.bson.json.JsonWriter; +import org.bson.types.Decimal128; + +import java.io.Serializable; +import java.io.StringWriter; +import java.io.Writer; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */ +@Internal +public class BsonToRowDataConverters { + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** + * Runtime converter that converts {@link BsonValue} into objects of Flink Table & SQL internal + * data structures. + */ + @FunctionalInterface + public interface BsonToRowDataConverter extends Serializable { + Object convert(BsonValue bsonValue); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Bson for + // sql-connector uber jars. + // -------------------------------------------------------------------------------- + + /** Creates a runtime converter which is null safe. */ + public static BsonToRowDataConverter createNullableConverter(LogicalType type) { + return wrapIntoNullableInternalConverter(createConverter(type)); + } + + private static BsonToRowDataConverter wrapIntoNullableInternalConverter( + BsonToRowDataConverter bsonToRowDataConverter) { + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + if (bsonValue == null || bsonValue.isNull() || bsonValue instanceof BsonUndefined) { + return null; + } + if (bsonValue.isDecimal128() && bsonValue.asDecimal128().getValue().isNaN()) { + return null; + } + return bsonToRowDataConverter.convert(bsonValue); + } + }; + } + + /** Creates a runtime converter which assuming input object is not null. */ + private static BsonToRowDataConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return null; + } + }; + case BOOLEAN: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToBoolean(bsonValue); + } + }; + case TINYINT: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToTinyInt(bsonValue); + } + }; + case SMALLINT: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToSmallInt(bsonValue); + } + }; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToInt(bsonValue); + } + }; + case BIGINT: + case INTERVAL_DAY_TIME: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToLong(bsonValue); + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return TimestampData.fromLocalDateTime(convertToLocalDateTime(bsonValue)); + } + }; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return TimestampData.fromInstant(convertToInstant(bsonValue)); + } + }; + case FLOAT: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToFloat(bsonValue); + } + }; + case DOUBLE: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToDouble(bsonValue); + } + }; + case CHAR: + case VARCHAR: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return StringData.fromString(convertToString(bsonValue)); + } + }; + case BINARY: + case VARBINARY: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return convertToBinary(bsonValue); + } + }; + case DECIMAL: + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + DecimalType decimalType = (DecimalType) type; + BigDecimal decimalValue = convertToBigDecimal(bsonValue); + return DecimalData.fromBigDecimal( + decimalValue, decimalType.getPrecision(), decimalType.getScale()); + } + }; + case ROW: + return createRowConverter((RowType) type); + case ARRAY: + return createArrayConverter((ArrayType) type); + case MAP: + return createMapConverter((MapType) type); + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static BsonToRowDataConverter createRowConverter(RowType rowType) { + final BsonToRowDataConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(BsonToRowDataConverters::createNullableConverter) + .toArray(BsonToRowDataConverter[]::new); + final int arity = rowType.getFieldCount(); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + if (!bsonValue.isDocument()) { + throw new IllegalArgumentException( + "Unable to convert to rowType from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + BsonDocument document = bsonValue.asDocument(); + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + BsonValue fieldValue = document.get(fieldName); + Object convertedField = fieldConverters[i].convert(fieldValue); + row.setField(i, convertedField); + } + return row; + } + }; + } + + private static BsonToRowDataConverter createArrayConverter(ArrayType arrayType) { + final Class<?> elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + final BsonToRowDataConverter elementConverter = + createNullableConverter(arrayType.getElementType()); + + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + if (!bsonValue.isArray()) { + throw new IllegalArgumentException( + "Unable to convert to arrayType from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + List<BsonValue> in = bsonValue.asArray(); + final Object[] elementArray = (Object[]) Array.newInstance(elementClass, in.size()); + for (int i = 0; i < in.size(); i++) { + elementArray[i] = elementConverter.convert(in.get(i)); + } + return new GenericArrayData(elementArray); + } + }; + } + + private static BsonToRowDataConverter createMapConverter(MapType mapType) { + LogicalType keyType = mapType.getKeyType(); + checkArgument(keyType.supportsInputConversion(String.class)); + + LogicalType valueType = mapType.getValueType(); + BsonToRowDataConverter valueConverter = createNullableConverter(valueType); + + return new BsonToRowDataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + if (!bsonValue.isDocument()) { + throw new IllegalArgumentException( + "Unable to convert to rowType from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + BsonDocument document = bsonValue.asDocument(); + Map<StringData, Object> map = new HashMap<>(); + for (String key : document.keySet()) { + map.put(StringData.fromString(key), valueConverter.convert(document.get(key))); + } + return new GenericMapData(map); + } + }; + } + + private static boolean convertToBoolean(BsonValue bsonValue) { + if (bsonValue.isBoolean()) { + return bsonValue.asBoolean().getValue(); + } + if (bsonValue.isInt32()) { + return bsonValue.asInt32().getValue() == 1; + } + if (bsonValue.isInt64()) { + return bsonValue.asInt64().getValue() == 1L; + } + if (bsonValue.isString()) { + return Boolean.parseBoolean(bsonValue.asString().getValue()); + } + throw new IllegalArgumentException( + "Unable to convert to boolean from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + private static byte convertToTinyInt(BsonValue bsonValue) { + if (bsonValue.isBoolean()) { + return bsonValue.asBoolean().getValue() ? (byte) 1 : (byte) 0; + } + if (bsonValue.isInt32()) { + return (byte) bsonValue.asInt32().getValue(); + } + if (bsonValue.isInt64()) { + return (byte) bsonValue.asInt64().getValue(); + } + if (bsonValue.isString()) { + return Byte.parseByte(bsonValue.asString().getValue()); + } + throw new IllegalArgumentException( + "Unable to convert to tinyint from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + private static short convertToSmallInt(BsonValue bsonValue) { + if (bsonValue.isBoolean()) { + return bsonValue.asBoolean().getValue() ? (short) 1 : (short) 0; + } + if (bsonValue.isInt32()) { + return (short) bsonValue.asInt32().getValue(); + } + if (bsonValue.isInt64()) { + return (short) bsonValue.asInt64().getValue(); + } + if (bsonValue.isString()) { + return Short.parseShort(bsonValue.asString().getValue()); + } + throw new IllegalArgumentException( + "Unable to convert to smallint from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + private static int convertToInt(BsonValue bsonValue) { + if (bsonValue.isNumber()) { + return bsonValue.asNumber().intValue(); + } + if (bsonValue.isDecimal128()) { + Decimal128 decimal128Value = bsonValue.asDecimal128().decimal128Value(); + if (decimal128Value.isFinite()) { + return decimal128Value.intValue(); + } else if (decimal128Value.isNegative()) { + return Integer.MIN_VALUE; + } else { + return Integer.MAX_VALUE; + } + } + if (bsonValue.isBoolean()) { + return bsonValue.asBoolean().getValue() ? 1 : 0; + } + if (bsonValue.isDateTime()) { + return (int) Instant.ofEpochMilli(bsonValue.asDateTime().getValue()).getEpochSecond(); + } + if (bsonValue.isTimestamp()) { + return bsonValue.asTimestamp().getTime(); + } + if (bsonValue.isString()) { + return Integer.parseInt(bsonValue.asString().getValue()); + } + throw new IllegalArgumentException( + "Unable to convert to integer from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + private static long convertToLong(BsonValue bsonValue) { + if (bsonValue.isNumber()) { + return bsonValue.asNumber().longValue(); + } + if (bsonValue.isDecimal128()) { + Decimal128 decimal128Value = bsonValue.asDecimal128().decimal128Value(); + if (decimal128Value.isFinite()) { + return decimal128Value.longValue(); + } else if (decimal128Value.isNegative()) { + return Long.MIN_VALUE; + } else { + return Long.MAX_VALUE; + } + } + if (bsonValue.isBoolean()) { + return bsonValue.asBoolean().getValue() ? 1L : 0L; + } + if (bsonValue.isDateTime()) { + return bsonValue.asDateTime().getValue(); + } + if (bsonValue.isTimestamp()) { + return bsonValue.asTimestamp().getTime() * 1000L; + } + if (bsonValue.isString()) { + return Long.parseLong(bsonValue.asString().getValue()); + } + throw new IllegalArgumentException( + "Unable to convert to long from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + private static double convertToDouble(BsonValue bsonValue) { + if (bsonValue.isNumber()) { + return bsonValue.asNumber().doubleValue(); + } + if (bsonValue.isDecimal128()) { + Decimal128 decimal128Value = bsonValue.asDecimal128().decimal128Value(); + if (decimal128Value.isFinite()) { + return decimal128Value.doubleValue(); + } else if (decimal128Value.isNegative()) { + return -Double.MAX_VALUE; + } else { + return Double.MAX_VALUE; + } + } + if (bsonValue.isBoolean()) { + return bsonValue.asBoolean().getValue() ? 1 : 0; + } + if (bsonValue.isString()) { + return Double.parseDouble(bsonValue.asString().getValue()); + } + throw new IllegalArgumentException( + "Unable to convert to double from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + + private static float convertToFloat(BsonValue bsonValue) { + if (bsonValue.isInt32()) { + return bsonValue.asInt32().getValue(); + } + if (bsonValue.isInt64()) { Review Comment: MongoDB uses the [BSON](https://www.mongodb.com/json-and-bson) structure internally, and the extend-json format is a way to convert BSON into JSON. We can get the BSON raw string-based value in the following way. ```java import org.bson.BsonDocument; import org.bson.BsonRegularExpression; import org.bson.BsonSymbol; public class BsonTest { public static void main(String[] args) { BsonSymbol symbol = new BsonSymbol("symbol"); BsonRegularExpression regex = new BsonRegularExpression("regex", "i"); BsonDocument doc = new BsonDocument() .append("f0", symbol) .append("f1", regex); // convert doc to json String extendJson = doc.toJson(); // output: {"f0": {"$symbol": "symbol"}, "f1": {"$regularExpression": {"pattern": "regex", "options": "i"}}} System.out.println(extendJson); // restore from json string BsonDocument parsed = BsonDocument.parse(extendJson); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org