lirui-apache commented on a change in pull request #16040: URL: https://github.com/apache/flink/pull/16040#discussion_r645538658
########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json.maxwell; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Maxwell using JSON encoding. */ +public class MaxwellJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + private List<String> metadataKeys; Review comment: Rename or add comment to indicate these keys are the requested metadata keys ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json.maxwell; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Maxwell using JSON encoding. */ +public class MaxwellJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + private final boolean ignoreParseErrors; + + private final TimestampFormat timestampFormat; + + public MaxwellJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat timestampFormat) { + this.ignoreParseErrors = ignoreParseErrors; + this.timestampFormat = timestampFormat; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType) { + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + final List<DataTypes.Field> metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); Review comment: What happens if physical field names and metadata field names collide? ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json.maxwell; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Maxwell using JSON encoding. */ +public class MaxwellJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + private final boolean ignoreParseErrors; + + private final TimestampFormat timestampFormat; + + public MaxwellJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat timestampFormat) { + this.ignoreParseErrors = ignoreParseErrors; + this.timestampFormat = timestampFormat; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType) { + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) Review comment: Provide a meaningful exception message ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java ########## @@ -50,40 +56,56 @@ public class MaxwellJsonDeserializationSchema implements DeserializationSchema<RowData> { private static final long serialVersionUID = 1L; Review comment: I think we should update the UID since we're changing the field members ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java ########## @@ -162,25 +209,70 @@ public boolean equals(Object o) { return false; } MaxwellJsonDeserializationSchema that = (MaxwellJsonDeserializationSchema) o; - return ignoreParseErrors == that.ignoreParseErrors - && fieldCount == that.fieldCount - && Objects.equals(jsonDeserializer, that.jsonDeserializer) - && Objects.equals(resultTypeInfo, that.resultTypeInfo); + return Objects.equals(jsonDeserializer, that.jsonDeserializer) + && hasMetadata == that.hasMetadata Review comment: Is it possible two instances both require metadata but the required fields are different? ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java ########## @@ -162,25 +209,70 @@ public boolean equals(Object o) { return false; } MaxwellJsonDeserializationSchema that = (MaxwellJsonDeserializationSchema) o; - return ignoreParseErrors == that.ignoreParseErrors - && fieldCount == that.fieldCount - && Objects.equals(jsonDeserializer, that.jsonDeserializer) - && Objects.equals(resultTypeInfo, that.resultTypeInfo); + return Objects.equals(jsonDeserializer, that.jsonDeserializer) + && hasMetadata == that.hasMetadata + && Objects.equals(producedTypeInfo, that.producedTypeInfo) + && ignoreParseErrors == that.ignoreParseErrors + && fieldCount == that.fieldCount; } @Override public int hashCode() { - return Objects.hash(jsonDeserializer, resultTypeInfo, ignoreParseErrors, fieldCount); + return Objects.hash( + jsonDeserializer, hasMetadata, producedTypeInfo, ignoreParseErrors, fieldCount); } - private RowType createJsonRowType(DataType databaseSchema) { - // Maxwell JSON contains other information, e.g. "database", "ts" - // but we don't need them - return (RowType) + // -------------------------------------------------------------------------------------------- + + private static RowType createJsonRowType( + DataType physicalDataType, List<ReadableMetadata> readableMetadata) { + // Maxwell JSON contains other information, e.g. "database", "ts", but we don't need them Review comment: this comment is no longer correct? ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java ########## @@ -94,22 +116,27 @@ public RowData deserialize(byte[] message) throws IOException { @Override public void deserialize(byte[] message, Collector<RowData> out) throws IOException { + if (message == null || message.length == 0) { + return; + } try { - RowData row = jsonDeserializer.deserialize(message); + final JsonNode root = jsonDeserializer.deserializeToJsonNode(message); + final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root); String type = row.getString(2).toString(); // "type" field if (OP_INSERT.equals(type)) { // "data" field is a row, contains inserted rows - RowData insert = row.getRow(0, fieldCount); + GenericRowData insert = (GenericRowData) row.getRow(0, fieldCount); insert.setRowKind(RowKind.INSERT); - out.collect(insert); + emitRow(row, insert, out); } else if (OP_UPDATE.equals(type)) { // "data" field is a row, contains new rows // "old" field is an array of row, contains old values Review comment: Is this correct? `old` field doesn't seem to be an array ########## File path: flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java ########## @@ -35,35 +39,78 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.FLOAT; import static org.apache.flink.table.api.DataTypes.INT; import static org.apache.flink.table.api.DataTypes.ROW; import static org.apache.flink.table.api.DataTypes.STRING; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; /** * Tests for {@link MaxwellJsonSerializationSchema} and {@link MaxwellJsonDeserializationSchema}. */ public class MaxwellJsonSerDerTest { - private static final RowType SCHEMA = - (RowType) - ROW( - FIELD("id", INT().notNull()), - FIELD("name", STRING()), - FIELD("description", STRING()), - FIELD("weight", FLOAT())) - .getLogicalType(); + + private static final DataType PHYSICAL_DATA_TYPE = + ROW( + FIELD("id", INT().notNull()), + FIELD("name", STRING()), + FIELD("description", STRING()), + FIELD("weight", FLOAT())); + + @Test + public void testDeserializationWithMetadata() throws Exception { + // we only read the first line for keeping the test simple + final String firstLine = readLines("maxwell-data.txt").get(0); + final List<ReadableMetadata> requestedMetadata = Arrays.asList(ReadableMetadata.values()); + final DataType producedDataType = + DataTypeUtils.appendRowFields( + PHYSICAL_DATA_TYPE, + requestedMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList())); + final MaxwellJsonDeserializationSchema deserializationSchema = + new MaxwellJsonDeserializationSchema( + PHYSICAL_DATA_TYPE, + requestedMetadata, + InternalTypeInfo.of(producedDataType.getLogicalType()), + false, + TimestampFormat.ISO_8601); + final SimpleCollector collector = new SimpleCollector(); + deserializationSchema.deserialize(firstLine.getBytes(StandardCharsets.UTF_8), collector); + assertEquals(1, collector.list.size()); + Consumer<RowData> consumer = + row -> { + assertThat(row.getInt(0), equalTo(101)); + assertThat(row.getString(1).toString(), equalTo("scooter")); + assertThat(row.getString(2).toString(), equalTo("Small 2-wheel scooter")); + assertThat(row.getFloat(3), equalTo(3.14f)); + assertThat(row.getString(4).toString(), equalTo("test")); + assertThat(row.getString(5).toString(), equalTo("product")); + assertThat(row.getArray(6), nullValue()); Review comment: Can we also test `primary-key-columns` which is not null? ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java ########## @@ -50,40 +56,56 @@ public class MaxwellJsonDeserializationSchema implements DeserializationSchema<RowData> { private static final long serialVersionUID = 1L; + private static final String FIELD_OLD = "old"; private static final String OP_INSERT = "insert"; private static final String OP_UPDATE = "update"; private static final String OP_DELETE = "delete"; /** The deserializer to deserialize Maxwell JSON data. */ private final JsonRowDataDeserializationSchema jsonDeserializer; - /** TypeInformation of the produced {@link RowData}. * */ - private final TypeInformation<RowData> resultTypeInfo; + /** Flag that indicates that an additional projection is required for metadata. */ + private final boolean hasMetadata; + + /** Metadata to be extracted for every record. */ + private final MetadataConverter[] metadataConverters; + + /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */ + private final TypeInformation<RowData> producedTypeInfo; /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ private final boolean ignoreParseErrors; + /** Names of fields. */ + private final List<String> fieldNames; + /** Number of fields. */ private final int fieldCount; Review comment: Clarify that these only consider physical fields ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java ########## @@ -143,14 +169,35 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti } } + private void emitRow( + GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) { + // shortcut in case no output projection is required + if (!hasMetadata) { + out.collect(physicalRow); + return; + } + final int physicalArity = physicalRow.getArity(); Review comment: wouldn't this be the same as `fieldCount`? ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java ########## @@ -50,40 +56,56 @@ public class MaxwellJsonDeserializationSchema implements DeserializationSchema<RowData> { private static final long serialVersionUID = 1L; + private static final String FIELD_OLD = "old"; private static final String OP_INSERT = "insert"; private static final String OP_UPDATE = "update"; private static final String OP_DELETE = "delete"; /** The deserializer to deserialize Maxwell JSON data. */ private final JsonRowDataDeserializationSchema jsonDeserializer; - /** TypeInformation of the produced {@link RowData}. * */ - private final TypeInformation<RowData> resultTypeInfo; + /** Flag that indicates that an additional projection is required for metadata. */ + private final boolean hasMetadata; + + /** Metadata to be extracted for every record. */ + private final MetadataConverter[] metadataConverters; + + /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */ + private final TypeInformation<RowData> producedTypeInfo; /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ private final boolean ignoreParseErrors; + /** Names of fields. */ + private final List<String> fieldNames; + /** Number of fields. */ private final int fieldCount; public MaxwellJsonDeserializationSchema( - RowType rowType, - TypeInformation<RowData> resultTypeInfo, + DataType physicalDataType, + List<ReadableMetadata> requestedMetadata, + TypeInformation<RowData> producedTypeInfo, boolean ignoreParseErrors, - TimestampFormat timestampFormatOption) { - this.resultTypeInfo = resultTypeInfo; - this.ignoreParseErrors = ignoreParseErrors; - this.fieldCount = rowType.getFieldCount(); + TimestampFormat timestampFormat) { + final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata); this.jsonDeserializer = new JsonRowDataDeserializationSchema( - createJsonRowType(fromLogicalToDataType(rowType)), - // the result type is never used, so it's fine to pass in Canal's result - // type - resultTypeInfo, + jsonRowType, + // the result type is never used, so it's fine to pass in the produced type + // info + producedTypeInfo, false, // ignoreParseErrors already contains the functionality of // failOnMissingField Review comment: nit: If the comment is for the `false` parameter, move it above the parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org