rmetzger commented on a change in pull request #13275: URL: https://github.com/apache/flink/pull/13275#discussion_r485362609
########## File path: flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java ########## @@ -94,25 +102,24 @@ public AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) { */ protected abstract T mapResultToOutType(Result r); - /** - * Creates a {@link Scan} object and opens the {@link HTable} connection. - * - * <p>These are opened here because they are needed in the createInputSplits - * which is called before the openInputFormat method. - * - * <p>The connection is opened in this method and closed in {@link #closeInputFormat()}. - * - * @param parameters The configuration that is to be used - * @see Configuration - */ - public abstract void configure(Configuration parameters); + @Override + public void configure(Configuration parameters) { + } protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() { return HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); } + /** + * Creates a {@link Scan} object and opens the {@link HTable} connection. + * The connection is opened in this method and closed in {@link #close()}. + * + * @param split The split to be opened. + * @throws IOException Thrown, if the spit could not be opened due to an I/O problem. + */ @Override public void open(TableInputSplit split) throws IOException { + initTable(); if (table == null) { throw new IOException("The HBase table has not been opened! " + "This needs to be done in configure()."); Review comment: I believe this (and below) exception messages are not correct anymore. ########## File path: flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java ########## @@ -186,24 +193,22 @@ public void close() throws IOException { if (resultScanner != null) { resultScanner.close(); } - } finally { - resultScanner = null; - } - } - - @Override - public void closeInputFormat() throws IOException { - try { if (table != null) { table.close(); } + if (connection != null) { + connection.close(); + } } finally { + resultScanner = null; table = null; + connection = null; } } @Override public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { + initTable(); if (table == null) { throw new IOException("The HBase table has not been opened! " + "This needs to be done in configure()."); Review comment: Revisit exception message. ########## File path: flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java ########## @@ -68,6 +71,11 @@ public AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) { serializedConfig = HBaseConfigurationUtil.serializeConfiguration(hConf); } + /** + * Creates a {@link Scan} object and opens the {@link HTable} connection to initialize the HBase table. + */ + protected abstract void initTable(); Review comment: If you make this method `throws IOException`, you don't need to wrap the IOExceptions in HBaseRowDataInputFormat.connectToTable() into RuntimeExceptions. Throwing an IOException is not a problem, because initTable is called in open(); which throws an IOException as well. ########## File path: flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java ########## @@ -186,24 +193,22 @@ public void close() throws IOException { if (resultScanner != null) { resultScanner.close(); } - } finally { - resultScanner = null; - } - } - - @Override - public void closeInputFormat() throws IOException { - try { if (table != null) { table.close(); } + if (connection != null) { + connection.close(); + } Review comment: We will leak the connection if table.close() or resultScanner.close() fails with an exception. I believe the proper way would be to wrap each close in a separate try block, and catch and log the error. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org