This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 021af147cc [Feature] [Postgre CDC]support array type (#8560) 021af147cc is described below commit 021af147cce6c90593406096e9ce2e0d96921d40 Author: litiliu <38579068+liti...@users.noreply.github.com> AuthorDate: Tue Jan 21 11:04:10 2025 +0800 [Feature] [Postgre CDC]support array type (#8560) Co-authored-by: litiliu <liti...@cisco.com> --- ...TunnelRowDebeziumDeserializationConverters.java | 41 +++++++++++++++++ ...elRowDebeziumDeserializationConvertersTest.java | 52 ++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java index 89b9c50c30..227d2b7eee 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.cdc.debezium.row; +import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; + +import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -48,6 +51,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; import java.util.Arrays; +import java.util.List; import java.util.Optional; /** Deserialization schema from Debezium object to {@link SeaTunnelRow} */ @@ -173,12 +177,49 @@ public class SeaTunnelRowDebeziumDeserializationConverters implements Serializab return createRowConverter( (SeaTunnelRowType) type, serverTimeZone, userDefinedConverterFactory); case ARRAY: + return createArrayConverter(type); case MAP: default: throw new UnsupportedOperationException("Unsupported type: " + type); } } + @VisibleForTesting + protected static DebeziumDeserializationConverter createArrayConverter( + SeaTunnelDataType<?> type) { + SeaTunnelDataType elementType = ((ArrayType) type).getElementType(); + switch (elementType.getSqlType()) { + case BOOLEAN: + return (dbzObj, schema) -> + convertListToArray((List<Boolean>) dbzObj, Boolean.class); + case SMALLINT: + return (dbzObj, schema) -> convertListToArray((List<Short>) dbzObj, Short.class); + case INT: + return (dbzObj, schema) -> + convertListToArray((List<Integer>) dbzObj, Integer.class); + case BIGINT: + return (dbzObj, schema) -> convertListToArray((List<Long>) dbzObj, Long.class); + case FLOAT: + return (dbzObj, schema) -> convertListToArray((List<Float>) dbzObj, Float.class); + case DOUBLE: + return (dbzObj, schema) -> convertListToArray((List<Double>) dbzObj, Double.class); + case STRING: + return (dbzObj, schema) -> convertListToArray((List<String>) dbzObj, String.class); + default: + throw new IllegalArgumentException( + "Unsupported SQL type: " + elementType.getSqlType()); + } + } + + @SuppressWarnings("unchecked") + private static <T> T[] convertListToArray(List<T> list, Class<T> clazz) { + T[] array = (T[]) java.lang.reflect.Array.newInstance(clazz, list.size()); + for (int i = 0; i < list.size(); i++) { + array[i] = list.get(i); + } + return array; + } + private static DebeziumDeserializationConverter convertToBoolean() { return new DebeziumDeserializationConverter() { private static final long serialVersionUID = 1L; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java index 74e832d6e0..14098cecc9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java @@ -17,10 +17,12 @@ package org.apache.seatunnel.connectors.cdc.debezium.row; +import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverter; import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory; import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter; @@ -34,6 +36,7 @@ import org.junit.jupiter.api.Test; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; public class SeaTunnelRowDebeziumDeserializationConvertersTest { @@ -75,4 +78,53 @@ public class SeaTunnelRowDebeziumDeserializationConvertersTest { Assertions.assertEquals(row.getField(0), 1); Assertions.assertNull(row.getField(1)); } + + @Test + void testArrayConverter() throws Exception { + DebeziumDeserializationConverter converter; + // bool array converter + converter = + SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter( + ArrayType.BOOLEAN_ARRAY_TYPE); + Boolean[] booleans = new Boolean[] {false, true}; + Assertions.assertTrue( + Arrays.equals( + booleans, (Boolean[]) (converter.convert(Arrays.asList(booleans), null)))); + // smallInt array converter + converter = + SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter( + ArrayType.SHORT_ARRAY_TYPE); + Short[] shorts = new Short[] {(short) 1, (short) 2}; + Assertions.assertTrue( + Arrays.equals(shorts, (Short[]) (converter.convert(Arrays.asList(shorts), null)))); + // int array converter + converter = + SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter( + ArrayType.INT_ARRAY_TYPE); + Integer[] ints = new Integer[] {1, 2}; + Assertions.assertTrue( + Arrays.equals(ints, (Integer[]) (converter.convert(Arrays.asList(ints), null)))); + // long array converter + converter = + SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter( + ArrayType.LONG_ARRAY_TYPE); + Long[] longs = new Long[] {1L, 2L}; + Assertions.assertTrue( + Arrays.equals(longs, (Long[]) (converter.convert(Arrays.asList(longs), null)))); + // float array converter + converter = + SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter( + ArrayType.FLOAT_ARRAY_TYPE); + Float[] floats = new Float[] {1.0f, 2.0f}; + Assertions.assertTrue( + Arrays.equals(floats, (Float[]) (converter.convert(Arrays.asList(floats), null)))); + // double array converter + converter = + SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter( + ArrayType.DOUBLE_ARRAY_TYPE); + Double[] doubles = new Double[] {1.0, 2.0}; + Assertions.assertTrue( + Arrays.equals( + doubles, (Double[]) (converter.convert(Arrays.asList(doubles), null)))); + } }