[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405893#comment-15405893
 ] 

ASF GitHub Bot commented on FLINK-4311:
---------------------------------------

GitHub user nielsbasjes opened a pull request:

    https://github.com/apache/flink/pull/2330

    FLINK-4311 Fixed several problems in TableInputFormat

    Question: Do you guys want a unit test for this?
    In HBase itself I have done this in the past yet this required a large 
chunk of additional software to start and stop an HBase minicluster during the 
unit tests.
    I.e. pull in this thing: 
    
https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
    and then do something like this:
    
https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestScanRowPrefix.java


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nielsbasjes/flink FLINK-4311

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2330
    
----
commit 5c3d53c810f8df6d5544685ef3f1004c46541daf
Author: Niels Basjes <nbas...@bol.com>
Date:   2016-08-03T12:54:34Z

    [FLINK-4311] TableInputFormat can handle reuse for next input split

commit 8696f5e257c7434d62e662c4c97f4ede2da5411b
Author: Niels Basjes <nbas...@bol.com>
Date:   2016-08-03T12:56:01Z

    [FLINK-4311] Cannot override a static member function.

----


> TableInputFormat fails when reused on next split
> ------------------------------------------------
>
>                 Key: FLINK-4311
>                 URL: https://issues.apache.org/jira/browse/FLINK-4311
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.0.3
>            Reporter: Niels Basjes
>            Assignee: Niels Basjes
>            Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
>       at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>       at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>       at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>       at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>       at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>       at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>       at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>       at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>       ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>       at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>       ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not been checked against this 
> constraint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to