This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 021af147cc [Feature] [Postgre CDC]support array type (#8560)
021af147cc is described below

commit 021af147cce6c90593406096e9ce2e0d96921d40
Author: litiliu <38579068+liti...@users.noreply.github.com>
AuthorDate: Tue Jan 21 11:04:10 2025 +0800

    [Feature] [Postgre CDC]support array type (#8560)
    
    Co-authored-by: litiliu <liti...@cisco.com>
---
 ...TunnelRowDebeziumDeserializationConverters.java | 41 +++++++++++++++++
 ...elRowDebeziumDeserializationConvertersTest.java | 52 ++++++++++++++++++++++
 2 files changed, 93 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
index 89b9c50c30..227d2b7eee 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.cdc.debezium.row;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -48,6 +51,7 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 
 /** Deserialization schema from Debezium object to {@link SeaTunnelRow} */
@@ -173,12 +177,49 @@ public class 
SeaTunnelRowDebeziumDeserializationConverters implements Serializab
                 return createRowConverter(
                         (SeaTunnelRowType) type, serverTimeZone, 
userDefinedConverterFactory);
             case ARRAY:
+                return createArrayConverter(type);
             case MAP:
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + 
type);
         }
     }
 
+    @VisibleForTesting
+    protected static DebeziumDeserializationConverter createArrayConverter(
+            SeaTunnelDataType<?> type) {
+        SeaTunnelDataType elementType = ((ArrayType) type).getElementType();
+        switch (elementType.getSqlType()) {
+            case BOOLEAN:
+                return (dbzObj, schema) ->
+                        convertListToArray((List<Boolean>) dbzObj, 
Boolean.class);
+            case SMALLINT:
+                return (dbzObj, schema) -> convertListToArray((List<Short>) 
dbzObj, Short.class);
+            case INT:
+                return (dbzObj, schema) ->
+                        convertListToArray((List<Integer>) dbzObj, 
Integer.class);
+            case BIGINT:
+                return (dbzObj, schema) -> convertListToArray((List<Long>) 
dbzObj, Long.class);
+            case FLOAT:
+                return (dbzObj, schema) -> convertListToArray((List<Float>) 
dbzObj, Float.class);
+            case DOUBLE:
+                return (dbzObj, schema) -> convertListToArray((List<Double>) 
dbzObj, Double.class);
+            case STRING:
+                return (dbzObj, schema) -> convertListToArray((List<String>) 
dbzObj, String.class);
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported SQL type: " + elementType.getSqlType());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T[] convertListToArray(List<T> list, Class<T> clazz) {
+        T[] array = (T[]) java.lang.reflect.Array.newInstance(clazz, 
list.size());
+        for (int i = 0; i < list.size(); i++) {
+            array[i] = list.get(i);
+        }
+        return array;
+    }
+
     private static DebeziumDeserializationConverter convertToBoolean() {
         return new DebeziumDeserializationConverter() {
             private static final long serialVersionUID = 1L;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
index 74e832d6e0..14098cecc9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.seatunnel.connectors.cdc.debezium.row;
 
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverter;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
 import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
 
@@ -34,6 +36,7 @@ import org.junit.jupiter.api.Test;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 
 public class SeaTunnelRowDebeziumDeserializationConvertersTest {
@@ -75,4 +78,53 @@ public class 
SeaTunnelRowDebeziumDeserializationConvertersTest {
         Assertions.assertEquals(row.getField(0), 1);
         Assertions.assertNull(row.getField(1));
     }
+
+    @Test
+    void testArrayConverter() throws Exception {
+        DebeziumDeserializationConverter converter;
+        // bool array converter
+        converter =
+                
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+                        ArrayType.BOOLEAN_ARRAY_TYPE);
+        Boolean[] booleans = new Boolean[] {false, true};
+        Assertions.assertTrue(
+                Arrays.equals(
+                        booleans, (Boolean[]) 
(converter.convert(Arrays.asList(booleans), null))));
+        // smallInt array converter
+        converter =
+                
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+                        ArrayType.SHORT_ARRAY_TYPE);
+        Short[] shorts = new Short[] {(short) 1, (short) 2};
+        Assertions.assertTrue(
+                Arrays.equals(shorts, (Short[]) 
(converter.convert(Arrays.asList(shorts), null))));
+        // int array converter
+        converter =
+                
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+                        ArrayType.INT_ARRAY_TYPE);
+        Integer[] ints = new Integer[] {1, 2};
+        Assertions.assertTrue(
+                Arrays.equals(ints, (Integer[]) 
(converter.convert(Arrays.asList(ints), null))));
+        // long array converter
+        converter =
+                
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+                        ArrayType.LONG_ARRAY_TYPE);
+        Long[] longs = new Long[] {1L, 2L};
+        Assertions.assertTrue(
+                Arrays.equals(longs, (Long[]) 
(converter.convert(Arrays.asList(longs), null))));
+        // float array converter
+        converter =
+                
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+                        ArrayType.FLOAT_ARRAY_TYPE);
+        Float[] floats = new Float[] {1.0f, 2.0f};
+        Assertions.assertTrue(
+                Arrays.equals(floats, (Float[]) 
(converter.convert(Arrays.asList(floats), null))));
+        // double array converter
+        converter =
+                
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+                        ArrayType.DOUBLE_ARRAY_TYPE);
+        Double[] doubles = new Double[] {1.0, 2.0};
+        Assertions.assertTrue(
+                Arrays.equals(
+                        doubles, (Double[]) 
(converter.convert(Arrays.asList(doubles), null))));
+    }
 }

Reply via email to