This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 06253be [feature]support read ipv4/ipv6 type (#365) 06253be is described below commit 06253be8e0a218709799a316688c6dfb8b5375a3 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Mon Apr 15 14:46:55 2024 +0800 [feature]support read ipv4/ipv6 type (#365) --- .../apache/doris/flink/catalog/DorisCatalog.java | 4 +- .../doris/flink/catalog/DorisTypeMapper.java | 6 + .../doris/flink/catalog/doris/DorisType.java | 3 + .../apache/doris/flink/serialization/RowBatch.java | 41 +++- .../java/org/apache/doris/flink/util/IPUtils.java | 201 +++++++++++++++++++ .../doris/flink/serialization/TestRowBatch.java | 220 ++++++++++++++++++++- 6 files changed, 464 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java index 96518d3..a09e8fc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java @@ -90,7 +90,7 @@ public class DorisCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class); private DorisSystem dorisSystem; - private DorisConnectionOptions connectionOptions; + private final DorisConnectionOptions connectionOptions; private final Map<String, String> properties; public DorisCatalog( @@ -168,7 +168,7 @@ public class DorisCatalog extends AbstractCatalog { throw new DatabaseNotExistException(getName(), name); } - if (!cascade && listTables(name).size() > 0) { + if (!cascade && !listTables(name).isEmpty()) { throw new DatabaseNotEmptyException(getName(), name); } dorisSystem.dropDatabase(name); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index e125a30..bbabd80 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -53,6 +53,8 @@ import static org.apache.doris.flink.catalog.doris.DorisType.DECIMAL_V3; import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE; import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT; import static org.apache.doris.flink.catalog.doris.DorisType.INT; +import static org.apache.doris.flink.catalog.doris.DorisType.IPV4; +import static org.apache.doris.flink.catalog.doris.DorisType.IPV6; import static org.apache.doris.flink.catalog.doris.DorisType.JSON; import static org.apache.doris.flink.catalog.doris.DorisType.JSONB; import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT; @@ -62,6 +64,7 @@ import static org.apache.doris.flink.catalog.doris.DorisType.STRING; import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT; import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT; import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR; +import static org.apache.doris.flink.catalog.doris.DorisType.VARIANT; public class DorisTypeMapper { @@ -113,6 +116,9 @@ public class DorisTypeMapper { case ARRAY: case MAP: case STRUCT: + case IPV4: + case IPV6: + case VARIANT: return DataTypes.STRING(); case DATE: case DATE_V2: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java index 3779143..b2b3776 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java @@ -42,4 +42,7 @@ public class DorisType { public static final String JSON = "JSON"; public static final String MAP = "MAP"; public static final String STRUCT = "STRUCT"; + public static final String VARIANT = "VARIANT"; + public static final String IPV4 = "IPV4"; + public static final String IPV6 = "IPV6"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index c6dfa0e..9e87bde 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeStampMicroVector; import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -44,6 +45,7 @@ import org.apache.arrow.vector.types.Types; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.models.Schema; +import org.apache.doris.flink.util.IPUtils; import org.apache.doris.sdk.thrift.TScanBatchResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,12 +65,14 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import static org.apache.doris.flink.util.IPUtils.convertLongToIPv4Address; + /** row batch data container. */ public class RowBatch { - private static Logger logger = LoggerFactory.getLogger(RowBatch.class); + private static final Logger logger = LoggerFactory.getLogger(RowBatch.class); public static class Row { - private List<Object> cols; + private final List<Object> cols; Row(int colCount) { this.cols = new ArrayList<>(colCount); @@ -84,10 +88,10 @@ public class RowBatch { } // offset for iterate the rowBatch - private int offsetInRowBatch = 0; + private int offsetInRowBatch; private int rowCountInOneBatch = 0; private int readRowCount = 0; - private List<Row> rowBatch = new ArrayList<>(); + private final List<Row> rowBatch = new ArrayList<>(); private final ArrowStreamReader arrowStreamReader; private VectorSchemaRoot root; private List<FieldVector> fieldVectors; @@ -149,10 +153,7 @@ public class RowBatch { } public boolean hasNext() { - if (offsetInRowBatch < readRowCount) { - return true; - } - return false; + return offsetInRowBatch < readRowCount; } private void addValueToRow(int rowIndex, Object obj) { @@ -226,6 +227,17 @@ public class RowBatch { fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex); addValueToRow(rowIndex, fieldValue); break; + case "IPV4": + if (!minorType.equals(Types.MinorType.UINT4)) { + return false; + } + UInt4Vector ipv4Vector = (UInt4Vector) fieldVector; + fieldValue = + ipv4Vector.isNull(rowIndex) + ? null + : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex)); + addValueToRow(rowIndex, fieldValue); + break; case "BIGINT": if (!minorType.equals(Types.MinorType.BIGINT)) { return false; @@ -390,6 +402,19 @@ public class RowBatch { String stringValue = new String(varCharVector.get(rowIndex)); addValueToRow(rowIndex, stringValue); break; + case "IPV6": + if (!minorType.equals(Types.MinorType.VARCHAR)) { + return false; + } + VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector; + if (ipv6VarcharVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + break; + } + String ipv6Str = new String(ipv6VarcharVector.get(rowIndex)); + String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); + addValueToRow(rowIndex, ipv6Address); + break; case "ARRAY": if (!minorType.equals(Types.MinorType.LIST)) { return false; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java new file mode 100644 index 0000000..a834bbf --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.util; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.util.Arrays; + +public class IPUtils { + /** + * Create an IPv6 address from a (positive) {@link java.math.BigInteger}. The magnitude of the + * {@link java.math.BigInteger} represents the IPv6 address value. Or in other words, the {@link + * java.math.BigInteger} with value N defines the Nth possible IPv6 address. + * + * @param bigInteger {@link java.math.BigInteger} value + * @return IPv6 address + */ + public static String fromBigInteger(BigInteger bigInteger) { + byte[] bytes = bigInteger.toByteArray(); + if (bytes[0] == 0) { + bytes = Arrays.copyOfRange(bytes, 1, bytes.length); // Skip leading zero byte + } + bytes = prefixWithZeroBytes(bytes); + long[] ipv6Bits = fromByteArray(bytes); + return toIPv6String(ipv6Bits[0], ipv6Bits[1]); + } + + private static byte[] prefixWithZeroBytes(byte[] original) { + byte[] target = new byte[16]; + System.arraycopy(original, 0, target, 16 - original.length, original.length); + return target; + } + + /** + * Create an IPv6 address from a byte array. + * + * @param bytes byte array with 16 bytes (interpreted unsigned) + * @return IPv6 address + */ + public static long[] fromByteArray(byte[] bytes) { + if (bytes == null || bytes.length != 16) { + throw new IllegalArgumentException("Byte array must be exactly 16 bytes long"); + } + ByteBuffer buf = ByteBuffer.wrap(bytes); + LongBuffer longBuffer = buf.asLongBuffer(); + return new long[] {longBuffer.get(), longBuffer.get()}; + } + + private static String toShortHandNotationString(long highBits, long lowBits) { + String[] strings = toArrayOfShortStrings(highBits, lowBits); + StringBuilder result = new StringBuilder(); + int[] shortHandNotationPositionAndLength = + startAndLengthOfLongestRunOfZeroes(highBits, lowBits); + int shortHandNotationPosition = shortHandNotationPositionAndLength[0]; + int shortHandNotationLength = shortHandNotationPositionAndLength[1]; + boolean useShortHandNotation = shortHandNotationLength > 1; + + for (int i = 0; i < strings.length; ++i) { + if (useShortHandNotation && i == shortHandNotationPosition) { + if (i == 0) { + result.append("::"); + } else { + result.append(":"); + } + } else if (i <= shortHandNotationPosition + || i >= shortHandNotationPosition + shortHandNotationLength) { + result.append(strings[i]); + if (i < 7) { + result.append(":"); + } + } + } + + return result.toString().toLowerCase(); + } + + private static String[] toArrayOfShortStrings(long highBits, long lowBits) { + short[] shorts = toShortArray(highBits, lowBits); + String[] strings = new String[shorts.length]; + + for (int i = 0; i < shorts.length; ++i) { + strings[i] = String.format("%x", shorts[i]); + } + + return strings; + } + + private static short[] toShortArray(long highBits, long lowBits) { + short[] shorts = new short[8]; + + for (int i = 0; i < 8; ++i) { + if (inHighRange(i)) { + shorts[i] = (short) ((int) (highBits << i * 16 >>> 48 & 0xFFFF)); + } else { + shorts[i] = (short) ((int) (lowBits << i * 16 >>> 48 & 0xFFFF)); + } + } + + return shorts; + } + + private static int[] startAndLengthOfLongestRunOfZeroes(long highBits, long lowBits) { + int longestConsecutiveZeroes = 0; + int longestConsecutiveZeroesPos = -1; + short[] shorts = toShortArray(highBits, lowBits); + + for (int pos = 0; pos < shorts.length; ++pos) { + int consecutiveZeroesAtCurrentPos = countConsecutiveZeroes(shorts, pos); + if (consecutiveZeroesAtCurrentPos > longestConsecutiveZeroes) { + longestConsecutiveZeroes = consecutiveZeroesAtCurrentPos; + longestConsecutiveZeroesPos = pos; + } + } + + return new int[] {longestConsecutiveZeroesPos, longestConsecutiveZeroes}; + } + + private static boolean inHighRange(int shortNumber) { + return shortNumber >= 0 && shortNumber < 4; + } + + private static int countConsecutiveZeroes(short[] shorts, int offset) { + int count = 0; + + for (int i = offset; i < shorts.length && shorts[i] == 0; ++i) { + ++count; + } + + return count; + } + + public static String toIPv6String(long highBits, long lowBits) { + + if (isIPv4Mapped(highBits, lowBits)) { + return toIPv4MappedAddressString(lowBits); + } else if (isIPv4Compatibility(highBits, lowBits)) { + return toIPv4CompatibilityAddressString(lowBits); + } + + return toShortHandNotationString(highBits, lowBits); + } + + public static String convertLongToIPv4Address(long lowBits) { + return String.format( + "%d.%d.%d.%d", + (lowBits >> 24) & 0xff, + (lowBits >> 16) & 0xff, + (lowBits >> 8) & 0xff, + lowBits & 0xff); + } + + private static String toIPv4MappedAddressString(long lowBits) { + return "::ffff:" + convertLongToIPv4Address(lowBits); + } + + private static String toIPv4CompatibilityAddressString(long lowBits) { + return "::" + convertLongToIPv4Address(lowBits); + } + + /** + * Returns true if the address is an IPv4-mapped IPv6 address. In these addresses, the first 80 + * bits are zero, the next 16 bits are one, and the remaining 32 bits are the IPv4 address. + * + * @return true if the address is an IPv4-mapped IPv6 addresses. + */ + private static boolean isIPv4Mapped(long highBits, long lowBits) { + return highBits == 0 + && (lowBits & 0xFFFF000000000000L) == 0 + && (lowBits & 0x0000FFFF00000000L) == 0x0000FFFF00000000L; + } + + /** + * Checks if the given IPv6 address is in IPv4 compatibility format. IPv4 compatibility format + * is characterized by having the high 96 bits of the IPv6 address set to zero, while the low 32 + * bits represent an IPv4 address. The criteria for determining IPv4 compatibility format are as + * follows: 1. The high 96 bits of the IPv6 address are all zeros. 2. The low 32 bits are within + * the range from 0 to 4294967295 (0xFFFFFFFF). 3. The first 16 bits of the low 32 bits are all + * ones (0xFFFF), indicating the special identifier for IPv4 compatibility format. + * + * @return True if the given IPv6 address is in IPv4 compatibility format; otherwise, false. + */ + private static boolean isIPv4Compatibility(long highBits, long lowBits) { + return highBits == 0L && lowBits <= 0xFFFFFFFFL && (lowBits & 65536L) == 65536L; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index 481004b..29e48b6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -35,6 +35,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeStampMicroVector; import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -79,6 +80,7 @@ import java.util.List; import java.util.NoSuchElementException; import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.junit.Assert.assertEquals; public class TestRowBatch { private static Logger logger = LoggerFactory.getLogger(TestRowBatch.class); @@ -939,7 +941,6 @@ public class TestRowBatch { Schema schema = RestService.parseSchema(schemaStr, logger); RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); - Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0)); @@ -955,4 +956,221 @@ public class TestRowBatch { thrown.expectMessage(startsWith("Get row offset:")); rowBatch.next(); } + + @Test + public void testIPV6() throws DorisException, IOException { + + ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); + + VectorSchemaRoot root = + VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema( + childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = + new ArrowStreamWriter( + root, new DictionaryProvider.MapDictionaryProvider(), outputStream); + + arrowStreamWriter.start(); + root.setRowCount(13); + + FieldVector vector = root.getVector("k1"); + VarCharVector ipv6Vector = (VarCharVector) vector; + ipv6Vector.setInitialCapacity(13); + ipv6Vector.allocateNew(); + ipv6Vector.setIndexDefined(0); + ipv6Vector.setValueLengthSafe(0, 1); + ipv6Vector.setSafe(0, "0".getBytes()); + + ipv6Vector.setIndexDefined(1); + ipv6Vector.setValueLengthSafe(0, 1); + ipv6Vector.setSafe(1, "1".getBytes()); + + ipv6Vector.setIndexDefined(2); + ipv6Vector.setSafe(2, "65535".getBytes()); + + ipv6Vector.setIndexDefined(3); + ipv6Vector.setSafe(3, "65536".getBytes()); + + ipv6Vector.setIndexDefined(4); + ipv6Vector.setSafe(4, "4294967295".getBytes()); + + ipv6Vector.setIndexDefined(5); + ipv6Vector.setSafe(5, "4294967296".getBytes()); + + ipv6Vector.setIndexDefined(6); + ipv6Vector.setSafe(6, "8589934591".getBytes()); + + ipv6Vector.setIndexDefined(7); + ipv6Vector.setSafe(7, "281470681743359".getBytes()); + + ipv6Vector.setIndexDefined(8); + ipv6Vector.setSafe(8, "281470681743360".getBytes()); + + ipv6Vector.setIndexDefined(9); + ipv6Vector.setSafe(9, "281474976710655".getBytes()); + + ipv6Vector.setIndexDefined(10); + ipv6Vector.setSafe(10, "281474976710656".getBytes()); + + ipv6Vector.setIndexDefined(11); + ipv6Vector.setSafe(11, "340277174624079928635746639885392347137".getBytes()); + + ipv6Vector.setIndexDefined(12); + ipv6Vector.setSafe(12, "340282366920938463463374607431768211455".getBytes()); + + vector.setValueCount(13); + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = + "{\"properties\":[" + + "{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow0 = rowBatch.next(); + assertEquals("::", actualRow0.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow1 = rowBatch.next(); + assertEquals("::1", actualRow1.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow2 = rowBatch.next(); + assertEquals("::ffff", actualRow2.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow3 = rowBatch.next(); + assertEquals("::0.1.0.0", actualRow3.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow4 = rowBatch.next(); + assertEquals("::255.255.255.255", actualRow4.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow5 = rowBatch.next(); + assertEquals("::1:0:0", actualRow5.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow6 = rowBatch.next(); + assertEquals("::1:ffff:ffff", actualRow6.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow7 = rowBatch.next(); + assertEquals("::fffe:ffff:ffff", actualRow7.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow8 = rowBatch.next(); + assertEquals("::ffff:0.0.0.0", actualRow8.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow9 = rowBatch.next(); + assertEquals("::ffff:255.255.255.255", actualRow9.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow10 = rowBatch.next(); + assertEquals("::1:0:0:0", actualRow10.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow11 = rowBatch.next(); + assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow12 = rowBatch.next(); + assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", actualRow12.get(0)); + + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + } + + @Test + public void testIPV4() throws DorisException, IOException { + + ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + childrenBuilder.add( + new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null)); + + VectorSchemaRoot root = + VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema( + childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = + new ArrowStreamWriter( + root, new DictionaryProvider.MapDictionaryProvider(), outputStream); + + arrowStreamWriter.start(); + root.setRowCount(5); + + FieldVector vector = root.getVector("k1"); + UInt4Vector uInt4Vector = (UInt4Vector) vector; + uInt4Vector.setInitialCapacity(5); + uInt4Vector.allocateNew(4); + uInt4Vector.setIndexDefined(0); + uInt4Vector.setSafe(0, 0); + uInt4Vector.setIndexDefined(1); + uInt4Vector.setSafe(1, 255); + uInt4Vector.setIndexDefined(2); + uInt4Vector.setSafe(2, 65535); + uInt4Vector.setIndexDefined(3); + uInt4Vector.setSafe(3, 16777215); + uInt4Vector.setIndexDefined(4); + uInt4Vector.setWithPossibleTruncate(4, 4294967295L); + vector.setValueCount(5); + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = + "{\"properties\":[" + + "{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow0 = rowBatch.next(); + assertEquals("0.0.0.0", actualRow0.get(0)); + List<Object> actualRow1 = rowBatch.next(); + assertEquals("0.0.0.255", actualRow1.get(0)); + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow2 = rowBatch.next(); + assertEquals("0.0.255.255", actualRow2.get(0)); + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow3 = rowBatch.next(); + assertEquals("0.255.255.255", actualRow3.get(0)); + List<Object> actualRow4 = rowBatch.next(); + assertEquals("255.255.255.255", actualRow4.get(0)); + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org