rmetzger commented on a change in pull request #13275:
URL: https://github.com/apache/flink/pull/13275#discussion_r485362609



##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
##########
@@ -94,25 +102,24 @@ public 
AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
         */
        protected abstract T mapResultToOutType(Result r);
 
-       /**
-        * Creates a {@link Scan} object and opens the {@link HTable} 
connection.
-        *
-        * <p>These are opened here because they are needed in the 
createInputSplits
-        * which is called before the openInputFormat method.
-        *
-        * <p>The connection is opened in this method and closed in {@link 
#closeInputFormat()}.
-        *
-        * @param parameters The configuration that is to be used
-        * @see Configuration
-        */
-       public abstract void configure(Configuration parameters);
+       @Override
+       public void configure(Configuration parameters) {
+       }
 
        protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() 
{
                return 
HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, 
HBaseConfigurationUtil.getHBaseConfiguration());
        }
 
+       /**
+        * Creates a {@link Scan} object and opens the {@link HTable} 
connection.
+        * The connection is opened in this method and closed in {@link 
#close()}.
+        *
+        * @param split The split to be opened.
+        * @throws IOException Thrown, if the spit could not be opened due to 
an I/O problem.
+        */
        @Override
        public void open(TableInputSplit split) throws IOException {
+               initTable();
                if (table == null) {
                        throw new IOException("The HBase table has not been 
opened! " +
                                "This needs to be done in configure().");

Review comment:
       I believe this (and below) exception messages are not correct anymore.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
##########
@@ -186,24 +193,22 @@ public void close() throws IOException {
                        if (resultScanner != null) {
                                resultScanner.close();
                        }
-               } finally {
-                       resultScanner = null;
-               }
-       }
-
-       @Override
-       public void closeInputFormat() throws IOException {
-               try {
                        if (table != null) {
                                table.close();
                        }
+                       if (connection != null) {
+                               connection.close();
+                       }
                } finally {
+                       resultScanner = null;
                        table = null;
+                       connection = null;
                }
        }
 
        @Override
        public TableInputSplit[] createInputSplits(final int minNumSplits) 
throws IOException {
+               initTable();
                if (table == null) {
                        throw new IOException("The HBase table has not been 
opened! " +
                                "This needs to be done in configure().");

Review comment:
       Revisit exception message.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
##########
@@ -68,6 +71,11 @@ public 
AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
                serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(hConf);
        }
 
+       /**
+        * Creates a {@link Scan} object and opens the {@link HTable} 
connection to initialize the HBase table.
+        */
+       protected abstract void initTable();

Review comment:
       If you make this method `throws IOException`, you don't need to wrap the 
IOExceptions in HBaseRowDataInputFormat.connectToTable() into RuntimeExceptions.
   
   Throwing an IOException is not a problem, because initTable is called in 
open(); which throws an IOException as well.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
##########
@@ -186,24 +193,22 @@ public void close() throws IOException {
                        if (resultScanner != null) {
                                resultScanner.close();
                        }
-               } finally {
-                       resultScanner = null;
-               }
-       }
-
-       @Override
-       public void closeInputFormat() throws IOException {
-               try {
                        if (table != null) {
                                table.close();
                        }
+                       if (connection != null) {
+                               connection.close();
+                       }

Review comment:
       We will leak the connection if table.close() or resultScanner.close() 
fails with an exception.
   I believe the proper way would be to wrap each close in a separate try 
block, and catch and log the error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to