This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 06253be  [feature]support read ipv4/ipv6 type (#365)
06253be is described below

commit 06253be8e0a218709799a316688c6dfb8b5375a3
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Mon Apr 15 14:46:55 2024 +0800

    [feature]support read ipv4/ipv6 type (#365)
---
 .../apache/doris/flink/catalog/DorisCatalog.java   |   4 +-
 .../doris/flink/catalog/DorisTypeMapper.java       |   6 +
 .../doris/flink/catalog/doris/DorisType.java       |   3 +
 .../apache/doris/flink/serialization/RowBatch.java |  41 +++-
 .../java/org/apache/doris/flink/util/IPUtils.java  | 201 +++++++++++++++++++
 .../doris/flink/serialization/TestRowBatch.java    | 220 ++++++++++++++++++++-
 6 files changed, 464 insertions(+), 11 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 96518d3..a09e8fc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -90,7 +90,7 @@ public class DorisCatalog extends AbstractCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisCatalog.class);
     private DorisSystem dorisSystem;
-    private DorisConnectionOptions connectionOptions;
+    private final DorisConnectionOptions connectionOptions;
     private final Map<String, String> properties;
 
     public DorisCatalog(
@@ -168,7 +168,7 @@ public class DorisCatalog extends AbstractCatalog {
             throw new DatabaseNotExistException(getName(), name);
         }
 
-        if (!cascade && listTables(name).size() > 0) {
+        if (!cascade && !listTables(name).isEmpty()) {
             throw new DatabaseNotEmptyException(getName(), name);
         }
         dorisSystem.dropDatabase(name);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index e125a30..bbabd80 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -53,6 +53,8 @@ import static 
org.apache.doris.flink.catalog.doris.DorisType.DECIMAL_V3;
 import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
 import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
 import static org.apache.doris.flink.catalog.doris.DorisType.INT;
+import static org.apache.doris.flink.catalog.doris.DorisType.IPV4;
+import static org.apache.doris.flink.catalog.doris.DorisType.IPV6;
 import static org.apache.doris.flink.catalog.doris.DorisType.JSON;
 import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
 import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
@@ -62,6 +64,7 @@ import static 
org.apache.doris.flink.catalog.doris.DorisType.STRING;
 import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT;
 import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
 import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
+import static org.apache.doris.flink.catalog.doris.DorisType.VARIANT;
 
 public class DorisTypeMapper {
 
@@ -113,6 +116,9 @@ public class DorisTypeMapper {
             case ARRAY:
             case MAP:
             case STRUCT:
+            case IPV4:
+            case IPV6:
+            case VARIANT:
                 return DataTypes.STRING();
             case DATE:
             case DATE_V2:
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
index 3779143..b2b3776 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
@@ -42,4 +42,7 @@ public class DorisType {
     public static final String JSON = "JSON";
     public static final String MAP = "MAP";
     public static final String STRUCT = "STRUCT";
+    public static final String VARIANT = "VARIANT";
+    public static final String IPV4 = "IPV4";
+    public static final String IPV6 = "IPV6";
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index c6dfa0e..9e87bde 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -32,6 +32,7 @@ import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
@@ -44,6 +45,7 @@ import org.apache.arrow.vector.types.Types;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.util.IPUtils;
 import org.apache.doris.sdk.thrift.TScanBatchResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,12 +65,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
+import static org.apache.doris.flink.util.IPUtils.convertLongToIPv4Address;
+
 /** row batch data container. */
 public class RowBatch {
-    private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(RowBatch.class);
 
     public static class Row {
-        private List<Object> cols;
+        private final List<Object> cols;
 
         Row(int colCount) {
             this.cols = new ArrayList<>(colCount);
@@ -84,10 +88,10 @@ public class RowBatch {
     }
 
     // offset for iterate the rowBatch
-    private int offsetInRowBatch = 0;
+    private int offsetInRowBatch;
     private int rowCountInOneBatch = 0;
     private int readRowCount = 0;
-    private List<Row> rowBatch = new ArrayList<>();
+    private final List<Row> rowBatch = new ArrayList<>();
     private final ArrowStreamReader arrowStreamReader;
     private VectorSchemaRoot root;
     private List<FieldVector> fieldVectors;
@@ -149,10 +153,7 @@ public class RowBatch {
     }
 
     public boolean hasNext() {
-        if (offsetInRowBatch < readRowCount) {
-            return true;
-        }
-        return false;
+        return offsetInRowBatch < readRowCount;
     }
 
     private void addValueToRow(int rowIndex, Object obj) {
@@ -226,6 +227,17 @@ public class RowBatch {
                 fieldValue = intVector.isNull(rowIndex) ? null : 
intVector.get(rowIndex);
                 addValueToRow(rowIndex, fieldValue);
                 break;
+            case "IPV4":
+                if (!minorType.equals(Types.MinorType.UINT4)) {
+                    return false;
+                }
+                UInt4Vector ipv4Vector = (UInt4Vector) fieldVector;
+                fieldValue =
+                        ipv4Vector.isNull(rowIndex)
+                                ? null
+                                : 
convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
+                addValueToRow(rowIndex, fieldValue);
+                break;
             case "BIGINT":
                 if (!minorType.equals(Types.MinorType.BIGINT)) {
                     return false;
@@ -390,6 +402,19 @@ public class RowBatch {
                 String stringValue = new String(varCharVector.get(rowIndex));
                 addValueToRow(rowIndex, stringValue);
                 break;
+            case "IPV6":
+                if (!minorType.equals(Types.MinorType.VARCHAR)) {
+                    return false;
+                }
+                VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
+                if (ipv6VarcharVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                String ipv6Str = new String(ipv6VarcharVector.get(rowIndex));
+                String ipv6Address = IPUtils.fromBigInteger(new 
BigInteger(ipv6Str));
+                addValueToRow(rowIndex, ipv6Address);
+                break;
             case "ARRAY":
                 if (!minorType.equals(Types.MinorType.LIST)) {
                     return false;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java
new file mode 100644
index 0000000..a834bbf
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.util;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.Arrays;
+
+public class IPUtils {
+    /**
+     * Create an IPv6 address from a (positive) {@link java.math.BigInteger}. 
The magnitude of the
+     * {@link java.math.BigInteger} represents the IPv6 address value. Or in 
other words, the {@link
+     * java.math.BigInteger} with value N defines the Nth possible IPv6 
address.
+     *
+     * @param bigInteger {@link java.math.BigInteger} value
+     * @return IPv6 address
+     */
+    public static String fromBigInteger(BigInteger bigInteger) {
+        byte[] bytes = bigInteger.toByteArray();
+        if (bytes[0] == 0) {
+            bytes = Arrays.copyOfRange(bytes, 1, bytes.length); // Skip 
leading zero byte
+        }
+        bytes = prefixWithZeroBytes(bytes);
+        long[] ipv6Bits = fromByteArray(bytes);
+        return toIPv6String(ipv6Bits[0], ipv6Bits[1]);
+    }
+
+    private static byte[] prefixWithZeroBytes(byte[] original) {
+        byte[] target = new byte[16];
+        System.arraycopy(original, 0, target, 16 - original.length, 
original.length);
+        return target;
+    }
+
+    /**
+     * Create an IPv6 address from a byte array.
+     *
+     * @param bytes byte array with 16 bytes (interpreted unsigned)
+     * @return IPv6 address
+     */
+    public static long[] fromByteArray(byte[] bytes) {
+        if (bytes == null || bytes.length != 16) {
+            throw new IllegalArgumentException("Byte array must be exactly 16 
bytes long");
+        }
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+        LongBuffer longBuffer = buf.asLongBuffer();
+        return new long[] {longBuffer.get(), longBuffer.get()};
+    }
+
+    private static String toShortHandNotationString(long highBits, long 
lowBits) {
+        String[] strings = toArrayOfShortStrings(highBits, lowBits);
+        StringBuilder result = new StringBuilder();
+        int[] shortHandNotationPositionAndLength =
+                startAndLengthOfLongestRunOfZeroes(highBits, lowBits);
+        int shortHandNotationPosition = shortHandNotationPositionAndLength[0];
+        int shortHandNotationLength = shortHandNotationPositionAndLength[1];
+        boolean useShortHandNotation = shortHandNotationLength > 1;
+
+        for (int i = 0; i < strings.length; ++i) {
+            if (useShortHandNotation && i == shortHandNotationPosition) {
+                if (i == 0) {
+                    result.append("::");
+                } else {
+                    result.append(":");
+                }
+            } else if (i <= shortHandNotationPosition
+                    || i >= shortHandNotationPosition + 
shortHandNotationLength) {
+                result.append(strings[i]);
+                if (i < 7) {
+                    result.append(":");
+                }
+            }
+        }
+
+        return result.toString().toLowerCase();
+    }
+
+    private static String[] toArrayOfShortStrings(long highBits, long lowBits) 
{
+        short[] shorts = toShortArray(highBits, lowBits);
+        String[] strings = new String[shorts.length];
+
+        for (int i = 0; i < shorts.length; ++i) {
+            strings[i] = String.format("%x", shorts[i]);
+        }
+
+        return strings;
+    }
+
+    private static short[] toShortArray(long highBits, long lowBits) {
+        short[] shorts = new short[8];
+
+        for (int i = 0; i < 8; ++i) {
+            if (inHighRange(i)) {
+                shorts[i] = (short) ((int) (highBits << i * 16 >>> 48 & 
0xFFFF));
+            } else {
+                shorts[i] = (short) ((int) (lowBits << i * 16 >>> 48 & 
0xFFFF));
+            }
+        }
+
+        return shorts;
+    }
+
+    private static int[] startAndLengthOfLongestRunOfZeroes(long highBits, 
long lowBits) {
+        int longestConsecutiveZeroes = 0;
+        int longestConsecutiveZeroesPos = -1;
+        short[] shorts = toShortArray(highBits, lowBits);
+
+        for (int pos = 0; pos < shorts.length; ++pos) {
+            int consecutiveZeroesAtCurrentPos = countConsecutiveZeroes(shorts, 
pos);
+            if (consecutiveZeroesAtCurrentPos > longestConsecutiveZeroes) {
+                longestConsecutiveZeroes = consecutiveZeroesAtCurrentPos;
+                longestConsecutiveZeroesPos = pos;
+            }
+        }
+
+        return new int[] {longestConsecutiveZeroesPos, 
longestConsecutiveZeroes};
+    }
+
+    private static boolean inHighRange(int shortNumber) {
+        return shortNumber >= 0 && shortNumber < 4;
+    }
+
+    private static int countConsecutiveZeroes(short[] shorts, int offset) {
+        int count = 0;
+
+        for (int i = offset; i < shorts.length && shorts[i] == 0; ++i) {
+            ++count;
+        }
+
+        return count;
+    }
+
+    public static String toIPv6String(long highBits, long lowBits) {
+
+        if (isIPv4Mapped(highBits, lowBits)) {
+            return toIPv4MappedAddressString(lowBits);
+        } else if (isIPv4Compatibility(highBits, lowBits)) {
+            return toIPv4CompatibilityAddressString(lowBits);
+        }
+
+        return toShortHandNotationString(highBits, lowBits);
+    }
+
+    public static String convertLongToIPv4Address(long lowBits) {
+        return String.format(
+                "%d.%d.%d.%d",
+                (lowBits >> 24) & 0xff,
+                (lowBits >> 16) & 0xff,
+                (lowBits >> 8) & 0xff,
+                lowBits & 0xff);
+    }
+
+    private static String toIPv4MappedAddressString(long lowBits) {
+        return "::ffff:" + convertLongToIPv4Address(lowBits);
+    }
+
+    private static String toIPv4CompatibilityAddressString(long lowBits) {
+        return "::" + convertLongToIPv4Address(lowBits);
+    }
+
+    /**
+     * Returns true if the address is an IPv4-mapped IPv6 address. In these 
addresses, the first 80
+     * bits are zero, the next 16 bits are one, and the remaining 32 bits are 
the IPv4 address.
+     *
+     * @return true if the address is an IPv4-mapped IPv6 addresses.
+     */
+    private static boolean isIPv4Mapped(long highBits, long lowBits) {
+        return highBits == 0
+                && (lowBits & 0xFFFF000000000000L) == 0
+                && (lowBits & 0x0000FFFF00000000L) == 0x0000FFFF00000000L;
+    }
+
+    /**
+     * Checks if the given IPv6 address is in IPv4 compatibility format. IPv4 
compatibility format
+     * is characterized by having the high 96 bits of the IPv6 address set to 
zero, while the low 32
+     * bits represent an IPv4 address. The criteria for determining IPv4 
compatibility format are as
+     * follows: 1. The high 96 bits of the IPv6 address are all zeros. 2. The 
low 32 bits are within
+     * the range from 0 to 4294967295 (0xFFFFFFFF). 3. The first 16 bits of 
the low 32 bits are all
+     * ones (0xFFFF), indicating the special identifier for IPv4 compatibility 
format.
+     *
+     * @return True if the given IPv6 address is in IPv4 compatibility format; 
otherwise, false.
+     */
+    private static boolean isIPv4Compatibility(long highBits, long lowBits) {
+        return highBits == 0L && lowBits <= 0xFFFFFFFFL && (lowBits & 65536L) 
== 65536L;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 481004b..29e48b6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -35,6 +35,7 @@ import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
@@ -79,6 +80,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertEquals;
 
 public class TestRowBatch {
     private static Logger logger = LoggerFactory.getLogger(TestRowBatch.class);
@@ -939,7 +941,6 @@ public class TestRowBatch {
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
         RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
-
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
         Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));
@@ -955,4 +956,221 @@ public class TestRowBatch {
         thrown.expectMessage(startsWith("Get row offset:"));
         rowBatch.next();
     }
+
+    @Test
+    public void testIPV6() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(13);
+
+        FieldVector vector = root.getVector("k1");
+        VarCharVector ipv6Vector = (VarCharVector) vector;
+        ipv6Vector.setInitialCapacity(13);
+        ipv6Vector.allocateNew();
+        ipv6Vector.setIndexDefined(0);
+        ipv6Vector.setValueLengthSafe(0, 1);
+        ipv6Vector.setSafe(0, "0".getBytes());
+
+        ipv6Vector.setIndexDefined(1);
+        ipv6Vector.setValueLengthSafe(0, 1);
+        ipv6Vector.setSafe(1, "1".getBytes());
+
+        ipv6Vector.setIndexDefined(2);
+        ipv6Vector.setSafe(2, "65535".getBytes());
+
+        ipv6Vector.setIndexDefined(3);
+        ipv6Vector.setSafe(3, "65536".getBytes());
+
+        ipv6Vector.setIndexDefined(4);
+        ipv6Vector.setSafe(4, "4294967295".getBytes());
+
+        ipv6Vector.setIndexDefined(5);
+        ipv6Vector.setSafe(5, "4294967296".getBytes());
+
+        ipv6Vector.setIndexDefined(6);
+        ipv6Vector.setSafe(6, "8589934591".getBytes());
+
+        ipv6Vector.setIndexDefined(7);
+        ipv6Vector.setSafe(7, "281470681743359".getBytes());
+
+        ipv6Vector.setIndexDefined(8);
+        ipv6Vector.setSafe(8, "281470681743360".getBytes());
+
+        ipv6Vector.setIndexDefined(9);
+        ipv6Vector.setSafe(9, "281474976710655".getBytes());
+
+        ipv6Vector.setIndexDefined(10);
+        ipv6Vector.setSafe(10, "281474976710656".getBytes());
+
+        ipv6Vector.setIndexDefined(11);
+        ipv6Vector.setSafe(11, 
"340277174624079928635746639885392347137".getBytes());
+
+        ipv6Vector.setIndexDefined(12);
+        ipv6Vector.setSafe(12, 
"340282366920938463463374607431768211455".getBytes());
+
+        vector.setValueCount(13);
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        assertEquals("::", actualRow0.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow1 = rowBatch.next();
+        assertEquals("::1", actualRow1.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow2 = rowBatch.next();
+        assertEquals("::ffff", actualRow2.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow3 = rowBatch.next();
+        assertEquals("::0.1.0.0", actualRow3.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow4 = rowBatch.next();
+        assertEquals("::255.255.255.255", actualRow4.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow5 = rowBatch.next();
+        assertEquals("::1:0:0", actualRow5.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow6 = rowBatch.next();
+        assertEquals("::1:ffff:ffff", actualRow6.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow7 = rowBatch.next();
+        assertEquals("::fffe:ffff:ffff", actualRow7.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow8 = rowBatch.next();
+        assertEquals("::ffff:0.0.0.0", actualRow8.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow9 = rowBatch.next();
+        assertEquals("::ffff:255.255.255.255", actualRow9.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow10 = rowBatch.next();
+        assertEquals("::1:0:0:0", actualRow10.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow11 = rowBatch.next();
+        assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow12 = rowBatch.next();
+        assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", 
actualRow12.get(0));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
+
+    @Test
+    public void testIPV4() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(
+                new Field("k1", FieldType.nullable(new ArrowType.Int(32, 
false)), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(5);
+
+        FieldVector vector = root.getVector("k1");
+        UInt4Vector uInt4Vector = (UInt4Vector) vector;
+        uInt4Vector.setInitialCapacity(5);
+        uInt4Vector.allocateNew(4);
+        uInt4Vector.setIndexDefined(0);
+        uInt4Vector.setSafe(0, 0);
+        uInt4Vector.setIndexDefined(1);
+        uInt4Vector.setSafe(1, 255);
+        uInt4Vector.setIndexDefined(2);
+        uInt4Vector.setSafe(2, 65535);
+        uInt4Vector.setIndexDefined(3);
+        uInt4Vector.setSafe(3, 16777215);
+        uInt4Vector.setIndexDefined(4);
+        uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
+        vector.setValueCount(5);
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        assertEquals("0.0.0.0", actualRow0.get(0));
+        List<Object> actualRow1 = rowBatch.next();
+        assertEquals("0.0.0.255", actualRow1.get(0));
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow2 = rowBatch.next();
+        assertEquals("0.0.255.255", actualRow2.get(0));
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow3 = rowBatch.next();
+        assertEquals("0.255.255.255", actualRow3.get(0));
+        List<Object> actualRow4 = rowBatch.next();
+        assertEquals("255.255.255.255", actualRow4.get(0));
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to