Chu Xue created FLINK-38294:
-------------------------------

             Summary: Hbase connector misclassifies failed task as successful
                 Key: FLINK-38294
                 URL: https://issues.apache.org/jira/browse/FLINK-38294
             Project: Flink
          Issue Type: Bug
          Components: Connectors / HBase
    Affects Versions: 1.20.2, 1.19.3, 1.18.1
            Reporter: Chu Xue
         Attachments: jobmanager.log, taskmanager.log

The Hbase connector will not flush when write less than 
1000(sink.buffer-flush.max-rows) records or execute in less than 
1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator 
excute flush when close and flush maybe failed.

 
{code:java}
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
    this.executor =
            Executors.newScheduledThreadPool(
                    1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
    this.scheduledFuture =
            this.executor.scheduleWithFixedDelay(
                    () -> {
                        if (closed) {
                            return;
                        }
                        try {
                            flush();
                        } catch (Exception e) {
                            // fail the sink and skip the rest of the items
                            // if the failure handler decides to throw an 
exception
                            failureThrowable.compareAndSet(null, e);
                        }
                    },
                    bufferFlushIntervalMillis,
                    bufferFlushIntervalMillis,
                    TimeUnit.MILLISECONDS);
} {code}
 

 

 
 
{code:java}
@SuppressWarnings("rawtypes")@Overridepublic void invoke(T value, Context 
context) throws Exception {   checkErrorAndRethrow();
        mutator.mutate(mutationConverter.convertToMutation(value));
        // flush when the buffer number of mutations greater than the 
configured max size.      if (bufferFlushMaxMutations > 0                 && 
numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {           
flush();        } else if (bufferFlushMaxMutations == 0 && 
bufferFlushMaxSizeInBytes == 0) {            flush();        }}


{code}
 
{code:java}
//代码占位符
@Overridepublic void close() throws Exception { closed = true;
        if (mutator != null) {          try {                   
mutator.close();                } catch (IOException e) {                       
LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);           
}               this.mutator = null;    }
        if (connection != null) {               try {                   
connection.close();             } catch (IOException e) {                       
LOG.warn("Exception occurs while closing HBase Connection.", e);                
}               this.connection = null; }
        if (scheduledFuture != null) {          scheduledFuture.cancel(false);  
        if (executor != null) {                 executor.shutdownNow();         
}       }} {code}
For example, creating a permission denial case where the user does not have 
permission(ranger) for the hbase namespace.The task will failed ,but return 
success.
[^jobmanager.log]
[^taskmanager.log]
 
Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this, 
throw the error.
{code:java}
@Overridepublic void close() throws Exception { closed = true;
        if (mutator != null) {          try {                   
mutator.close();                } catch (IOException e) {                       
LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);           
}               this.mutator = null;    }
        if (connection != null) {               try {                   
connection.close();             } catch (IOException e) {                       
LOG.warn("Exception occurs while closing HBase Connection.", e);                
}               this.connection = null; }
        if (scheduledFuture != null) {          scheduledFuture.cancel(false);  
        if (executor != null) {                 executor.shutdownNow();         
}       }       checkErrorAndRethrow();} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to