This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1da9bd6ce4 [E2E][HBase]Refactor hbase e2e (#6859)
1da9bd6ce4 is described below

commit 1da9bd6ce475a012a702a4a864d5307303ed0660
Author: TaoZex <45089228+tao...@users.noreply.github.com>
AuthorDate: Tue Jun 4 09:57:09 2024 +0800

    [E2E][HBase]Refactor hbase e2e (#6859)
---
 .../hbase/format/HBaseDeserializationFormat.java   |   2 +
 .../connector-hbase-e2e/pom.xml                    |  14 ++-
 .../e2e/connector/hbase/HbaseCluster.java          | 138 +++++++++++++++++++++
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     | 109 ++++++----------
 .../src/test/resources/fake-to-hbase-array.conf    |   2 +-
 .../src/test/resources/fake-to-hbase.conf          |   2 +-
 .../src/test/resources/hbase-to-assert.conf        |  48 ++++---
 7 files changed, 220 insertions(+), 95 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
index 8d7a1bcbe1..578df7101c 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
@@ -58,7 +58,9 @@ public class HBaseDeserializationFormat {
 
         switch (typeInfo.getSqlType()) {
             case TINYINT:
+                return cell[0];
             case SMALLINT:
+                return (short) ((cell[0] & 0xFF) << 8 | (cell[1] & 0xFF));
             case INT:
                 return Bytes.toInt(cell);
             case BOOLEAN:
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
index 93ba8de981..b9fdd76f6f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
@@ -26,14 +26,24 @@
     <name>SeaTunnel : E2E : Connector V2 : Hbase</name>
 
     <dependencies>
-
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-hbase</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java
new file mode 100644
index 0000000000..3b54c35592
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static org.apache.seatunnel.e2e.common.container.TestContainer.NETWORK;
+
+public class HbaseCluster {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HbaseCluster.class);
+
+    private static final int ZOOKEEPER_PORT = 2181;
+    private static final int MASTER_PORT = 16000;
+    private static final int REGION_PORT = 16020;
+    private static final String HOST = "hbase_e2e";
+
+    private static final String DOCKER_NAME = 
"jcjabouille/hbase-standalone:2.4.9";
+    private static final DockerImageName HBASE_DOCKER_IMAGE = 
DockerImageName.parse(DOCKER_NAME);
+
+    private Connection connection;
+    private GenericContainer<?> hbaseContainer;
+
+    public Connection startService() throws IOException {
+        String hostname = InetAddress.getLocalHost().getHostName();
+        hbaseContainer =
+                new GenericContainer<>(HBASE_DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withExposedPorts(MASTER_PORT)
+                        .withExposedPorts(REGION_PORT)
+                        .withExposedPorts(ZOOKEEPER_PORT)
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withHostName(hostname))
+                        .withEnv("HBASE_MASTER_PORT", 
String.valueOf(MASTER_PORT))
+                        .withEnv("HBASE_REGION_PORT", 
String.valueOf(REGION_PORT))
+                        .withEnv(
+                                "HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT",
+                                String.valueOf(ZOOKEEPER_PORT))
+                        .withEnv("HBASE_ZOOKEEPER_QUORUM", HOST)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_NAME)));
+        hbaseContainer.setPortBindings(
+                Arrays.asList(
+                        String.format("%s:%s", MASTER_PORT, MASTER_PORT),
+                        String.format("%s:%s", REGION_PORT, REGION_PORT),
+                        String.format("%s:%s", ZOOKEEPER_PORT, 
ZOOKEEPER_PORT)));
+        Startables.deepStart(Stream.of(hbaseContainer)).join();
+        LOG.info("HBase container started");
+
+        String zookeeperQuorum = getZookeeperQuorum();
+        LOG.info("Successfully start hbase service, zookeeper quorum: {}", 
zookeeperQuorum);
+        Configuration configuration = HBaseConfiguration.create();
+        configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
+        configuration.set("hbase.security.authentication", "simple");
+        configuration.set("hbase.rpc.timeout", "10000");
+        configuration.set("hbase.master.port", String.valueOf(MASTER_PORT));
+        configuration.set("hbase.regionserver.port", 
String.valueOf(REGION_PORT));
+        connection = ConnectionFactory.createConnection(configuration);
+        return connection;
+    }
+
+    public void createTable(String tableName, List<String> list) throws 
IOException {
+        TableDescriptorBuilder tableDesc =
+                
TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
+
+        List<ColumnFamilyDescriptor> colFamilyList = new ArrayList<>();
+        for (String columnFamilys : list) {
+            ColumnFamilyDescriptorBuilder c =
+                    
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamilys));
+            colFamilyList.add(c.build());
+        }
+        tableDesc.setColumnFamilies(colFamilyList);
+        Admin hbaseAdmin = connection.getAdmin();
+        hbaseAdmin.createTable(tableDesc.build());
+    }
+
+    public void stopService() throws IOException {
+        if (Objects.nonNull(connection)) {
+            connection.close();
+        }
+        if (Objects.nonNull(hbaseContainer)) {
+            hbaseContainer.close();
+        }
+        hbaseContainer = null;
+    }
+
+    public static String getZookeeperQuorum() {
+        String host = null;
+        try {
+            host = InetAddress.getLocalHost().getHostAddress();
+        } catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+        return String.format("%s:%s", host, ZOOKEEPER_PORT);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index e27e5c715e..d3cd57b326 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -19,88 +19,63 @@ package org.apache.seatunnel.e2e.connector.hbase;
 
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Objects;
-import java.util.stream.Stream;
 
 @Slf4j
-@Disabled(
-        "Hbase docker e2e case need user add mapping information of between 
container id and ip address in hosts file")
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SEATUNNEL},
+        disabledReason = "The hbase container authentication configuration is 
incorrect.")
 public class HbaseIT extends TestSuiteBase implements TestResource {
 
-    private static final String IMAGE = "harisekhon/hbase:latest";
-
-    private static final int PORT = 2181;
-
-    private static final String HOST = "hbase-e2e";
-
     private static final String TABLE_NAME = "seatunnel_test";
 
     private static final String FAMILY_NAME = "info";
 
-    private final Configuration hbaseConfiguration = 
HBaseConfiguration.create();
-
     private Connection hbaseConnection;
 
     private Admin admin;
 
     private TableName table;
 
-    private GenericContainer<?> hbaseContainer;
+    private HbaseCluster hbaseCluster;
 
     @BeforeAll
     @Override
     public void startUp() throws Exception {
-        hbaseContainer =
-                new GenericContainer<>(DockerImageName.parse(IMAGE))
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(HOST)
-                        .withExposedPorts(PORT)
-                        .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
-                        .waitingFor(
-                                new HostPortWaitStrategy()
-                                        
.withStartupTimeout(Duration.ofMinutes(2)));
-        Startables.deepStart(Stream.of(hbaseContainer)).join();
-        log.info("Hbase container started");
-        this.initialize();
+        hbaseCluster = new HbaseCluster();
+        hbaseConnection = hbaseCluster.startService();
+        // Create table for hbase sink test
+        log.info("initial");
+        hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
+        table = TableName.valueOf(TABLE_NAME);
     }
 
     @AfterAll
@@ -109,59 +84,37 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         if (Objects.nonNull(admin)) {
             admin.close();
         }
-        if (Objects.nonNull(hbaseConnection)) {
-            hbaseConnection.close();
-        }
-        if (Objects.nonNull(hbaseContainer)) {
-            hbaseContainer.close();
-        }
-    }
-
-    private void initialize() throws IOException {
-        hbaseConfiguration.set("hbase.zookeeper.quorum", HOST + ":" + PORT);
-        hbaseConnection = 
ConnectionFactory.createConnection(hbaseConfiguration);
-        admin = hbaseConnection.getAdmin();
-        table = TableName.valueOf(TABLE_NAME);
-        ColumnFamilyDescriptor familyDescriptor =
-                
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME.getBytes())
-                        .setCompressionType(Compression.Algorithm.SNAPPY)
-                        
.setCompactionCompressionType(Compression.Algorithm.SNAPPY)
-                        .build();
-        TableDescriptor tableDescriptor =
-                
TableDescriptorBuilder.newBuilder(table).setColumnFamily(familyDescriptor).build();
-        admin.createTable(tableDescriptor);
-        log.info("Hbase table has been initialized");
+        hbaseCluster.stopService();
     }
 
     @TestTemplate
     public void testHbaseSink(TestContainer container) throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
container.executeJob("/fake-to-hbase.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
+        deleteData(table);
+        Container.ExecResult sinkExecResult = 
container.executeJob("/fake-to-hbase.conf");
+        Assertions.assertEquals(0, sinkExecResult.getExitCode());
         Table hbaseTable = hbaseConnection.getTable(table);
         Scan scan = new Scan();
-        ArrayList<Result> results = new ArrayList<>();
         ResultScanner scanner = hbaseTable.getScanner(scan);
+        ArrayList<Result> results = new ArrayList<>();
         for (Result result : scanner) {
             results.add(result);
         }
         Assertions.assertEquals(results.size(), 5);
-    }
-
-    @TestTemplate
-    public void testHbaseSource(TestContainer container) throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
container.executeJob("/hbase-to-assert.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
+        scanner.close();
+        Container.ExecResult sourceExecResult = 
container.executeJob("/hbase-to-assert.conf");
+        Assertions.assertEquals(0, sourceExecResult.getExitCode());
     }
 
     @TestTemplate
     public void testHbaseSinkWithArray(TestContainer container)
             throws IOException, InterruptedException {
+        deleteData(table);
         Container.ExecResult execResult = 
container.executeJob("/fake-to-hbase-array.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
         Table hbaseTable = hbaseConnection.getTable(table);
         Scan scan = new Scan();
-        ArrayList<Result> results = new ArrayList<>();
         ResultScanner scanner = hbaseTable.getScanner(scan);
+        ArrayList<Result> results = new ArrayList<>();
         for (Result result : scanner) {
             String rowKey = Bytes.toString(result.getRow());
             for (Cell cell : result.listCells()) {
@@ -177,5 +130,17 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
             results.add(result);
         }
         Assertions.assertEquals(results.size(), 3);
+        scanner.close();
+    }
+
+    private void deleteData(TableName table) throws IOException {
+        Table hbaseTable = hbaseConnection.getTable(table);
+        Scan scan = new Scan();
+        ResultScanner scanner = hbaseTable.getScanner(scan);
+        // Delete the data generated by the test
+        for (Result result = scanner.next(); result != null; result = 
scanner.next()) {
+            Delete deleteRow = new Delete(result.getRow());
+            hbaseTable.delete(deleteRow);
+        }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
index 5cf1896ea2..9da70ea80a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
@@ -49,7 +49,7 @@ source {
 
 sink {
   Hbase {
-    zookeeper_quorum = "hbase-e2e:2181"
+    zookeeper_quorum = "hbase_e2e:2181"
     table = "seatunnel_test"
     rowkey_column = ["name"]
     family_name {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
index f3e58ec008..be99bf43fe 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
@@ -40,7 +40,7 @@ source {
 
 sink {
   Hbase {
-    zookeeper_quorum = "hbase-e2e:2181"
+    zookeeper_quorum = "hbase_e2e:2181"
     table = "seatunnel_test"
     rowkey_column = ["name"]
     family_name {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
index f209875745..c8b750738d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
@@ -22,9 +22,9 @@ env {
 
 source {
   Hbase {
-      zookeeper_quorum = "hbase-e2e:2181"
+      zookeeper_quorum = "hbase_e2e:2181"
       table = "seatunnel_test"
-      query_columns=["rowkey", "cf1:col1", "cf1:col2", "cf2:col1", "cf2:col2"]
+      query_columns=["rowkey", "info:age", "info:c_double", 
"info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"]
       schema = {
             columns = [
                   {
@@ -32,39 +32,49 @@ source {
                      type = string
                   },
                   {
-                     name = "cf1:col1"
-                     type = boolean
+                     name = "info:age"
+                     type = int
                   },
                   {
-                     name = "cf1:col2"
+                     name = "info:c_double"
                      type = double
                   },
                   {
-                     name = "cf2:col1"
+                     name = "info:c_boolean"
+                     type = boolean
+                  },
+                  {
+                     name = "info:c_bigint"
                      type = bigint
                   },
                   {
-                     name = "cf2:col2"
-                     type = int
+                     name = "info:c_smallint"
+                     type = smallint
+                  },
+                  {
+                     name = "info:c_tinyint"
+                     type = tinyint
+                  },
+                  {
+                     name = "info:c_float"
+                     type = float
                   }
-            ]
-      }
-      result_table_name = hbase_source
-  }
+             ]
+       }
+    }
 }
 
 sink {
   Assert {
-    source_table_name = hbase_source
     rules {
       row_rules = [
         {
           rule_type = MAX_ROW
-          rule_value = 10000
+          rule_value = 5
         },
         {
           rule_type = MIN_ROW
-          rule_value = 10000
+          rule_value = 5
         }
       ],
       field_rules = [
@@ -78,7 +88,7 @@ sink {
           ]
         },
         {
-          field_name = "cf1:col1"
+          field_name = "info:c_boolean"
           field_type = boolean
           field_value = [
             {
@@ -87,7 +97,7 @@ sink {
           ]
         },
         {
-          field_name = "cf1:col2"
+          field_name = "info:c_double"
           field_type = double
           field_value = [
             {
@@ -96,7 +106,7 @@ sink {
           ]
         },
         {
-          field_name = "cf2:col1"
+          field_name = "info:c_bigint"
           field_type = bigint
           field_value = [
             {
@@ -105,7 +115,7 @@ sink {
           ]
         },
         {
-          field_name = "cf2:col2"
+          field_name = "info:age"
           field_type = int
           field_value = [
             {

Reply via email to