MOBIN created FLINK-36188:
-----------------------------

             Summary: HBase disable buffer flush lose efficacy
                 Key: FLINK-36188
                 URL: https://issues.apache.org/jira/browse/FLINK-36188
             Project: Flink
          Issue Type: Bug
          Components: Connectors / HBase
    Affects Versions: hbase-3.0.0
         Environment: Flink 1.16.3
            Reporter: MOBIN


HBase table
||rowkey||col||
|1|1|

The user lookup joins the hbase table, adds 1 to the col value, and writes it 
back to hbase
{code:java}
@Test
void testTableSinkDisabledBufferFlush() throws Exception {
        StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);        tEnv.executeSql(
                "CREATE TABLE hTableForSink ("
                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
                        + " family1 ROW<col1 INT>"
                        + ") WITH ("
                        + " 'connector' = 'hbase-2.2',"
                        + " 'sink.buffer-flush.max-size' = '0',"
                        + " 'sink.buffer-flush.max-rows' = '0',"
                        + " 'table-name' = '"
                        + TEST_TABLE_6
                        + "',"
                        + " 'zookeeper.quorum' = '"
                        + getZookeeperQuorum()
                        + "'"
                        + ")");        String insert = "INSERT INTO 
hTableForSink VALUES(1, ROW(1))";
        tEnv.executeSql(insert).await();        tEnv.executeSql(
                "CREATE VIEW user_click AS "
                        + " SELECT user_id, proctime() AS proc_time"
                        + " FROM ( "
                        + " VALUES(1), (1), (1), (1), (1)"
                        + " ) AS t (user_id);");        tEnv.executeSql(
                "INSERT INTO hTableForSink SELECT "
                        + "    user_id as rowkey,"
                        + "    ROW(CAST(family1.col1 + 1 AS INT))"
                        + " FROM user_click INNER JOIN hTableForSink"
                        + " FOR SYSTEM_TIME AS OF user_click.proc_time"
                        + " ON hTableForSink.rowkey = user_click.user_id;");    
    tEnv.executeSql(
                "CREATE TABLE hTableForQuery ("
                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
                        + " family1 ROW<col1 INT>"
                        + ") WITH ("
                        + " 'connector' = 'hbase-2.2',"
                        + " 'table-name' = '"
                        + TEST_TABLE_6
                        + "',"
                        + " 'zookeeper.quorum' = '"
                        + getZookeeperQuorum()
                        + "'"
                        + ")");
        String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";       
 TableResult firstResult = tEnv.executeSql(query);
        List<Row> firstResults = 
CollectionUtil.iteratorToList(firstResult.collect());
        String firstExpected = "+I[1, 6]";
        TestBaseUtils.compareResultAsText(firstResults, firstExpected);
    } {code}
test failed
{code:java}
org.junit.ComparisonFailure: Different elements in arrays: expected 1 elements 
and received 1
 expected: [+I[1, 6]]
 received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
Expected :+I[1, 6]
Actual   :+I[1, 2] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to