Chu Xue created FLINK-38294: ------------------------------- Summary: Hbase connector misclassifies failed task as successful Key: FLINK-38294 URL: https://issues.apache.org/jira/browse/FLINK-38294 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: 1.20.2, 1.19.3, 1.18.1 Reporter: Chu Xue Attachments: jobmanager.log, taskmanager.log
The Hbase connector will not flush when write less than 1000(sink.buffer-flush.max-rows) records or execute in less than 1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator excute flush when close and flush maybe failed. {code:java} if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { this.executor = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher")); this.scheduledFuture = this.executor.scheduleWithFixedDelay( () -> { if (closed) { return; } try { flush(); } catch (Exception e) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, e); } }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS); } {code} {code:java} @SuppressWarnings("rawtypes")@Overridepublic void invoke(T value, Context context) throws Exception { checkErrorAndRethrow(); mutator.mutate(mutationConverter.convertToMutation(value)); // flush when the buffer number of mutations greater than the configured max size. if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { flush(); } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) { flush(); }} {code} {code:java} //代码占位符 @Overridepublic void close() throws Exception { closed = true; if (mutator != null) { try { mutator.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase BufferedMutator.", e); } this.mutator = null; } if (connection != null) { try { connection.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase Connection.", e); } this.connection = null; } if (scheduledFuture != null) { scheduledFuture.cancel(false); if (executor != null) { executor.shutdownNow(); } }} {code} For example, creating a permission denial case where the user does not have permission(ranger) for the hbase namespace.The task will failed ,but return success. [^jobmanager.log] [^taskmanager.log] Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this, throw the error. {code:java} @Overridepublic void close() throws Exception { closed = true; if (mutator != null) { try { mutator.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase BufferedMutator.", e); } this.mutator = null; } if (connection != null) { try { connection.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase Connection.", e); } this.connection = null; } if (scheduledFuture != null) { scheduledFuture.cancel(false); if (executor != null) { executor.shutdownNow(); } } checkErrorAndRethrow();} {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)