Hi there,
There is an open PR that looks like this issue: 
https://github.com/apache/flink/pull/26159 for Avro.
I suggest you review this code and make a comment to see if the code can be 
made amended to cover your json case and see what response you get. It might be 
that you will need to raise a separate Jira and associated PR – then follow the 
pattern of this Avro PR,

      Kind regards, David.

From: Semen Pavelko <sem.pav...@gmail.com>
Date: Thursday, 6 March 2025 at 15:26
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] upgrade DebeziumJsonDeserializationSchema
Hi everyone! I apologize in advance if I’m posting in the wrong place or in
the wrong way. I’m using Flink to read messages from Kafka, where the Kafka
topic contains rows from PostgreSQL CDC (Debezium). To save resources, I
remove the schema and don’t use the full mode. Of course, this leads to an
exception in the DebeziumJsonDeserializationSchema class because the before
row is not sent for updates. It seems to me that adding a flag like
upsertParse could help avoid this issue. Here’s my current working code.

package org.apache.flink.formats.json.debezium;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import 
org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static java.lang.String.format;

/**
 * Deserialization schema from Debezium JSON to Flink Table/SQL
internal data structure {@link
 * RowData}. The deserialization schema knows Debezium's schema
definition and can extract the
 * database data and convert into {@link RowData} with {@link RowKind}.
 *
 * <p>Deserializes a <code>byte[]</code> message as a JSON object and
reads the specified fields.
 *
 * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
 *
 * @see <a 
href="https://debezium.io/";>Debezium</a<https://debezium.io/%22%3eDebezium%3c/a>>
 */
@Internal
public final class DebeziumJsonDeserializationSchema implements
DeserializationSchema<RowData> {
    private static final long serialVersionUID = 1L;

    private static final String OP_READ = "r"; // snapshot read
    private static final String OP_CREATE = "c"; // insert
    private static final String OP_UPDATE = "u"; // update
    private static final String OP_DELETE = "d"; // delete

    private static final String REPLICA_IDENTITY_EXCEPTION =
            "The \"before\" field of %s message is null, "
                    + "if you are using Debezium Postgres Connector, "
                    + "please check the Postgres table has been set
REPLICA IDENTITY to FULL level.";

    /** The deserializer to deserialize Debezium JSON data. */
    private final JsonRowDataDeserializationSchema jsonDeserializer;

    /** Flag that indicates that an additional projection is required
for metadata. */
    private final boolean hasMetadata;

    /** Metadata to be extracted for every record. */
    private final MetadataConverter[] metadataConverters;

    /** {@link TypeInformation} of the produced {@link RowData}
(physical + meta data). */
    private final TypeInformation<RowData> producedTypeInfo;

    /**
     * Flag indicating whether the Debezium JSON data contains schema
part or not. When Debezium
     * Kafka Connect enables "value.converter.schemas.enable", the
JSON will contain "schema"
     * information, but we just ignore "schema" and extract data from "payload".
     */
    private final boolean schemaInclude;

    /** Flag indicating whether to ignore invalid fields/rows
(default: throw an exception). */
    private final boolean ignoreParseErrors;

    private final boolean upsertParse;

    /** List of data to be processed. */
    private transient List<GenericRowData> genericRowDataList;

    public DebeziumJsonDeserializationSchema(
            DataType physicalDataType,
            List<ReadableMetadata> requestedMetadata,
            TypeInformation<RowData> producedTypeInfo,
            boolean schemaInclude,
            boolean ignoreParseErrors,
            boolean upsertParse,
            TimestampFormat timestampFormat) {
        final RowType jsonRowType =
                createJsonRowType(physicalDataType, requestedMetadata,
schemaInclude);
        this.jsonDeserializer =
                new JsonRowDataDeserializationSchema(
                        jsonRowType,
                        // the result type is never used, so it's fine
to pass in the produced type
                        // info
                        producedTypeInfo,
                        false, // ignoreParseErrors already contains
the functionality of
                        // failOnMissingField
                        ignoreParseErrors,
                        timestampFormat);
        this.hasMetadata = requestedMetadata.size() > 0;
        this.metadataConverters =
                createMetadataConverters(jsonRowType,
requestedMetadata, schemaInclude);
        this.producedTypeInfo = producedTypeInfo;
        this.schemaInclude = schemaInclude;
        this.ignoreParseErrors = ignoreParseErrors;
        this.upsertParse = upsertParse;
    }

    @Override
    public void open(InitializationContext context) throws Exception {
        genericRowDataList = new ArrayList<>();
        jsonDeserializer.open(context);
    }

    @Override
    public RowData deserialize(byte[] message) {
        throw new RuntimeException(
                "Please invoke
DeserializationSchema#deserialize(byte[], Collector<RowData>)
instead.");
    }

    @Override
    public void deserialize(byte[] message, Collector<RowData> out)
throws IOException {
        if (message == null || message.length == 0) {
            // skip tombstone messages
            return;
        }
        genericRowDataList.clear();
        try {
            GenericRowData row = (GenericRowData)
jsonDeserializer.deserialize(message);
            GenericRowData payload;
            if (schemaInclude) {
                payload = (GenericRowData) row.getField(0);
            } else {
                payload = row;
            }

            GenericRowData before = (GenericRowData) payload.getField(0);
            GenericRowData after = (GenericRowData) payload.getField(1);
            String op = payload.getField(2).toString();
            if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
                after.setRowKind(RowKind.INSERT);
                genericRowDataList.add(handleRow(row, after));
            } else if (OP_UPDATE.equals(op)) {
                if (!upsertParse) {
                    if (before == null) {
                        throw new IllegalStateException(

String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
                    }
                    before.setRowKind(RowKind.UPDATE_BEFORE);
                    after.setRowKind(RowKind.UPDATE_AFTER);
                    genericRowDataList.add(handleRow(row, before));
                    genericRowDataList.add(handleRow(row, after));
                } else {
                    after.setRowKind(RowKind.UPDATE_AFTER);
                    genericRowDataList.add(handleRow(row, after));
                }
            } else if (OP_DELETE.equals(op)) {
                if (before == null) {
                    throw new IllegalStateException(
                            String.format(REPLICA_IDENTITY_EXCEPTION,
"DELETE"));
                }
                before.setRowKind(RowKind.DELETE);
                genericRowDataList.add(handleRow(row, before));
            } else {
                if (!ignoreParseErrors) {
                    throw new IOException(
                            format(
                                    "Unknown \"op\" value \"%s\". The
Debezium JSON message is '%s'",
                                    op, new String(message)));
                }
            }
        } catch (Throwable t) {
            // a big try catch to protect the processing.
            if (!ignoreParseErrors) {
                throw new IOException(
                        format("Corrupt Debezium JSON message '%s'.",
new String(message)), t);
            }
        }
        for (GenericRowData genericRowData : genericRowDataList) {
            out.collect(genericRowData);
        }
    }

    private GenericRowData handleRow(GenericRowData rootRow,
GenericRowData physicalRow) {
        // shortcut in case no output projection is required
        if (!hasMetadata) {
            return physicalRow;
        }

        final int physicalArity = physicalRow.getArity();
        final int metadataArity = metadataConverters.length;

        final GenericRowData producedRow =
                new GenericRowData(physicalRow.getRowKind(),
physicalArity + metadataArity);

        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
            producedRow.setField(physicalPos,
physicalRow.getField(physicalPos));
        }

        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
            producedRow.setField(
                    physicalArity + metadataPos,
metadataConverters[metadataPos].convert(rootRow));
        }
        return producedRow;
    }

    @Override
    public boolean isEndOfStream(RowData nextElement) {
        return false;
    }

    @Override
    public TypeInformation<RowData> getProducedType() {
        return producedTypeInfo;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        DebeziumJsonDeserializationSchema that =
(DebeziumJsonDeserializationSchema) o;
        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
                && hasMetadata == that.hasMetadata
                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
                && schemaInclude == that.schemaInclude
                && ignoreParseErrors == that.ignoreParseErrors;
    }

    @Override
    public int hashCode() {
        return Objects.hash(
                jsonDeserializer, hasMetadata, producedTypeInfo,
schemaInclude, ignoreParseErrors);
    }

    // 
--------------------------------------------------------------------------------------------

    private static RowType createJsonRowType(
            DataType physicalDataType,
            List<ReadableMetadata> readableMetadata,
            boolean schemaInclude) {
        DataType payload =
                DataTypes.ROW(
                        DataTypes.FIELD("before", physicalDataType),
                        DataTypes.FIELD("after", physicalDataType),
                        DataTypes.FIELD("op", DataTypes.STRING()));

        // append fields that are required for reading metadata in the payload
        final List<DataTypes.Field> payloadMetadataFields =
                readableMetadata.stream()
                        .filter(m -> m.isJsonPayload)
                        .map(m -> m.requiredJsonField)
                        .distinct()
                        .collect(Collectors.toList());
        payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);

        DataType root = payload;
        if (schemaInclude) {
            // when Debezium Kafka connect enables
"value.converter.schemas.enable",
            // the JSON will contain "schema" information and we need
to extract data from
            // "payload".
            root = DataTypes.ROW(DataTypes.FIELD("payload", payload));
        }

        // append fields that are required for reading metadata in the root
        final List<DataTypes.Field> rootMetadataFields =
                readableMetadata.stream()
                        .filter(m -> !m.isJsonPayload)
                        .map(m -> m.requiredJsonField)
                        .distinct()
                        .collect(Collectors.toList());
        root = DataTypeUtils.appendRowFields(root, rootMetadataFields);

        return (RowType) root.getLogicalType();
    }

    private static MetadataConverter[] createMetadataConverters(
            RowType jsonRowType, List<ReadableMetadata>
requestedMetadata, boolean schemaInclude) {
        return requestedMetadata.stream()
                .map(
                        m -> {
                            if (m.isJsonPayload) {
                                return convertInPayload(jsonRowType,
m, schemaInclude);
                            } else {
                                return convertInRoot(jsonRowType, m);
                            }
                        })
                .toArray(MetadataConverter[]::new);
    }

    private static MetadataConverter convertInRoot(RowType
jsonRowType, ReadableMetadata metadata) {
        final int pos = findFieldPos(metadata, jsonRowType);
        return new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(GenericRowData root, int unused) {
                return metadata.converter.convert(root, pos);
            }
        };
    }

    private static MetadataConverter convertInPayload(
            RowType jsonRowType, ReadableMetadata metadata, boolean
schemaInclude) {
        if (schemaInclude) {
            final int pos = findFieldPos(metadata, (RowType)
jsonRowType.getChildren().get(0));
            return new MetadataConverter() {
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(GenericRowData root, int unused) {
                    final GenericRowData payload = (GenericRowData)
root.getField(0);
                    return metadata.converter.convert(payload, pos);
                }
            };
        }
        return convertInRoot(jsonRowType, metadata);
    }

    private static int findFieldPos(ReadableMetadata metadata, RowType
jsonRowType) {
        return 
jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
    }

    // 
--------------------------------------------------------------------------------------------

    /**
     * Converter that extracts a metadata field from the row (root or
payload) that comes out of the
     * JSON schema and converts it to the desired data type.
     */
    interface MetadataConverter extends Serializable {

        // Method for top-level access.
        default Object convert(GenericRowData row) {
            return convert(row, -1);
        }

        Object convert(GenericRowData row, int pos);
    }
}

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN

Reply via email to