lirui-apache commented on a change in pull request #16040:
URL: https://github.com/apache/flink/pull/16040#discussion_r645538658



##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.maxwell;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import 
org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Maxwell using JSON encoding. */
+public class MaxwellJsonDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;

Review comment:
       Rename or add comment to indicate these keys are the requested metadata 
keys

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.maxwell;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import 
org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Maxwell using JSON encoding. */
+public class MaxwellJsonDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    private final boolean ignoreParseErrors;
+
+    private final TimestampFormat timestampFormat;
+
+    public MaxwellJsonDecodingFormat(boolean ignoreParseErrors, 
TimestampFormat timestampFormat) {
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType) {
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, 
metadataFields);

Review comment:
       What happens if physical field names and metadata field names collide?

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.maxwell;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import 
org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Maxwell using JSON encoding. */
+public class MaxwellJsonDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    private final boolean ignoreParseErrors;
+
+    private final TimestampFormat timestampFormat;
+
+    public MaxwellJsonDecodingFormat(boolean ignoreParseErrors, 
TimestampFormat timestampFormat) {
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType) {
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))

Review comment:
       Provide a meaningful exception message

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
##########
@@ -50,40 +56,56 @@
 public class MaxwellJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
     private static final long serialVersionUID = 1L;

Review comment:
       I think we should update the UID since we're changing the field members

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
##########
@@ -162,25 +209,70 @@ public boolean equals(Object o) {
             return false;
         }
         MaxwellJsonDeserializationSchema that = 
(MaxwellJsonDeserializationSchema) o;
-        return ignoreParseErrors == that.ignoreParseErrors
-                && fieldCount == that.fieldCount
-                && Objects.equals(jsonDeserializer, that.jsonDeserializer)
-                && Objects.equals(resultTypeInfo, that.resultTypeInfo);
+        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
+                && hasMetadata == that.hasMetadata

Review comment:
       Is it possible two instances both require metadata but the required 
fields are different?

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
##########
@@ -162,25 +209,70 @@ public boolean equals(Object o) {
             return false;
         }
         MaxwellJsonDeserializationSchema that = 
(MaxwellJsonDeserializationSchema) o;
-        return ignoreParseErrors == that.ignoreParseErrors
-                && fieldCount == that.fieldCount
-                && Objects.equals(jsonDeserializer, that.jsonDeserializer)
-                && Objects.equals(resultTypeInfo, that.resultTypeInfo);
+        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
+                && hasMetadata == that.hasMetadata
+                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+                && ignoreParseErrors == that.ignoreParseErrors
+                && fieldCount == that.fieldCount;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(jsonDeserializer, resultTypeInfo, 
ignoreParseErrors, fieldCount);
+        return Objects.hash(
+                jsonDeserializer, hasMetadata, producedTypeInfo, 
ignoreParseErrors, fieldCount);
     }
 
-    private RowType createJsonRowType(DataType databaseSchema) {
-        // Maxwell JSON contains other information, e.g. "database", "ts"
-        // but we don't need them
-        return (RowType)
+    // 
--------------------------------------------------------------------------------------------
+
+    private static RowType createJsonRowType(
+            DataType physicalDataType, List<ReadableMetadata> 
readableMetadata) {
+        // Maxwell JSON contains other information, e.g. "database", "ts", but 
we don't need them

Review comment:
       this comment is no longer correct?

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
##########
@@ -94,22 +116,27 @@ public RowData deserialize(byte[] message) throws 
IOException {
 
     @Override
     public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
+        if (message == null || message.length == 0) {
+            return;
+        }
         try {
-            RowData row = jsonDeserializer.deserialize(message);
+            final JsonNode root = 
jsonDeserializer.deserializeToJsonNode(message);
+            final GenericRowData row = (GenericRowData) 
jsonDeserializer.convertToRowData(root);
             String type = row.getString(2).toString(); // "type" field
             if (OP_INSERT.equals(type)) {
                 // "data" field is a row, contains inserted rows
-                RowData insert = row.getRow(0, fieldCount);
+                GenericRowData insert = (GenericRowData) row.getRow(0, 
fieldCount);
                 insert.setRowKind(RowKind.INSERT);
-                out.collect(insert);
+                emitRow(row, insert, out);
             } else if (OP_UPDATE.equals(type)) {
                 // "data" field is a row, contains new rows
                 // "old" field is an array of row, contains old values

Review comment:
       Is this correct? `old` field doesn't seem to be an array

##########
File path: 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
##########
@@ -35,35 +39,78 @@
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.FLOAT;
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for {@link MaxwellJsonSerializationSchema} and {@link 
MaxwellJsonDeserializationSchema}.
  */
 public class MaxwellJsonSerDerTest {
-    private static final RowType SCHEMA =
-            (RowType)
-                    ROW(
-                                    FIELD("id", INT().notNull()),
-                                    FIELD("name", STRING()),
-                                    FIELD("description", STRING()),
-                                    FIELD("weight", FLOAT()))
-                            .getLogicalType();
+
+    private static final DataType PHYSICAL_DATA_TYPE =
+            ROW(
+                    FIELD("id", INT().notNull()),
+                    FIELD("name", STRING()),
+                    FIELD("description", STRING()),
+                    FIELD("weight", FLOAT()));
+
+    @Test
+    public void testDeserializationWithMetadata() throws Exception {
+        // we only read the first line for keeping the test simple
+        final String firstLine = readLines("maxwell-data.txt").get(0);
+        final List<ReadableMetadata> requestedMetadata = 
Arrays.asList(ReadableMetadata.values());
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(
+                        PHYSICAL_DATA_TYPE,
+                        requestedMetadata.stream()
+                                .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                                .collect(Collectors.toList()));
+        final MaxwellJsonDeserializationSchema deserializationSchema =
+                new MaxwellJsonDeserializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        requestedMetadata,
+                        InternalTypeInfo.of(producedDataType.getLogicalType()),
+                        false,
+                        TimestampFormat.ISO_8601);
+        final SimpleCollector collector = new SimpleCollector();
+        
deserializationSchema.deserialize(firstLine.getBytes(StandardCharsets.UTF_8), 
collector);
+        assertEquals(1, collector.list.size());
+        Consumer<RowData> consumer =
+                row -> {
+                    assertThat(row.getInt(0), equalTo(101));
+                    assertThat(row.getString(1).toString(), 
equalTo("scooter"));
+                    assertThat(row.getString(2).toString(), equalTo("Small 
2-wheel scooter"));
+                    assertThat(row.getFloat(3), equalTo(3.14f));
+                    assertThat(row.getString(4).toString(), equalTo("test"));
+                    assertThat(row.getString(5).toString(), 
equalTo("product"));
+                    assertThat(row.getArray(6), nullValue());

Review comment:
       Can we also test `primary-key-columns` which is not null?

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
##########
@@ -50,40 +56,56 @@
 public class MaxwellJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
     private static final long serialVersionUID = 1L;
 
+    private static final String FIELD_OLD = "old";
     private static final String OP_INSERT = "insert";
     private static final String OP_UPDATE = "update";
     private static final String OP_DELETE = "delete";
 
     /** The deserializer to deserialize Maxwell JSON data. */
     private final JsonRowDataDeserializationSchema jsonDeserializer;
 
-    /** TypeInformation of the produced {@link RowData}. * */
-    private final TypeInformation<RowData> resultTypeInfo;
+    /** Flag that indicates that an additional projection is required for 
metadata. */
+    private final boolean hasMetadata;
+
+    /** Metadata to be extracted for every record. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + 
meta data). */
+    private final TypeInformation<RowData> producedTypeInfo;
 
     /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
     private final boolean ignoreParseErrors;
 
+    /** Names of fields. */
+    private final List<String> fieldNames;
+
     /** Number of fields. */
     private final int fieldCount;

Review comment:
       Clarify that these only consider physical fields

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
##########
@@ -143,14 +169,35 @@ public void deserialize(byte[] message, 
Collector<RowData> out) throws IOExcepti
         }
     }
 
+    private void emitRow(
+            GenericRowData rootRow, GenericRowData physicalRow, 
Collector<RowData> out) {
+        // shortcut in case no output projection is required
+        if (!hasMetadata) {
+            out.collect(physicalRow);
+            return;
+        }
+        final int physicalArity = physicalRow.getArity();

Review comment:
       wouldn't this be the same as `fieldCount`?

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
##########
@@ -50,40 +56,56 @@
 public class MaxwellJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
     private static final long serialVersionUID = 1L;
 
+    private static final String FIELD_OLD = "old";
     private static final String OP_INSERT = "insert";
     private static final String OP_UPDATE = "update";
     private static final String OP_DELETE = "delete";
 
     /** The deserializer to deserialize Maxwell JSON data. */
     private final JsonRowDataDeserializationSchema jsonDeserializer;
 
-    /** TypeInformation of the produced {@link RowData}. * */
-    private final TypeInformation<RowData> resultTypeInfo;
+    /** Flag that indicates that an additional projection is required for 
metadata. */
+    private final boolean hasMetadata;
+
+    /** Metadata to be extracted for every record. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + 
meta data). */
+    private final TypeInformation<RowData> producedTypeInfo;
 
     /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
     private final boolean ignoreParseErrors;
 
+    /** Names of fields. */
+    private final List<String> fieldNames;
+
     /** Number of fields. */
     private final int fieldCount;
 
     public MaxwellJsonDeserializationSchema(
-            RowType rowType,
-            TypeInformation<RowData> resultTypeInfo,
+            DataType physicalDataType,
+            List<ReadableMetadata> requestedMetadata,
+            TypeInformation<RowData> producedTypeInfo,
             boolean ignoreParseErrors,
-            TimestampFormat timestampFormatOption) {
-        this.resultTypeInfo = resultTypeInfo;
-        this.ignoreParseErrors = ignoreParseErrors;
-        this.fieldCount = rowType.getFieldCount();
+            TimestampFormat timestampFormat) {
+        final RowType jsonRowType = createJsonRowType(physicalDataType, 
requestedMetadata);
         this.jsonDeserializer =
                 new JsonRowDataDeserializationSchema(
-                        createJsonRowType(fromLogicalToDataType(rowType)),
-                        // the result type is never used, so it's fine to pass 
in Canal's result
-                        // type
-                        resultTypeInfo,
+                        jsonRowType,
+                        // the result type is never used, so it's fine to pass 
in the produced type
+                        // info
+                        producedTypeInfo,
                         false, // ignoreParseErrors already contains the 
functionality of
                         // failOnMissingField

Review comment:
       nit: If the comment is for the `false` parameter, move it above the 
parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to