What I can tell is how the HBase input format works..if you look
at AbstractTableInputFormat [1] this is the nextRecord() function:

        public T nextRecord(T reuse) throws IOException {
                if (resultScanner == null) {
                        throw new IOException("No table result scanner 
provided!");
                }
                try {
                        Result res = resultScanner.next();
                        if (res != null) {
                                scannedRows++;
                                currentRow = res.getRow();
                                return mapResultToOutType(res);
                        }
                } catch (Exception e) {
                        resultScanner.close();
                        //workaround for timeout on scan
                        LOG.warn("Error after scan of " + scannedRows + " rows. 
Retry with
a new scanner...", e);
                        scan.setStartRow(currentRow);
                        resultScanner = table.getScanner(scan);
                        Result res = resultScanner.next();
                        if (res != null) {
                                scannedRows++;
                                currentRow = res.getRow();
                                return mapResultToOutType(res);
                        }
                }

                endReached = true;
                return null;
        }

When the resultScanner dies because of a timeout (this happens a lot when
you have backpressure and the time between 2 consecutive reads exceed the
scanner timeout), the code creates a new scanner and restart from where it
was (starRow = currentRow).
So there should not be any duplicates (in theory), but this could be the
root of the problem..

Best,
Flavio

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java

On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <moda...@protonmail.com> wrote:

> Hello,
>
> I am reading Results from an HBase table and process them with Batch API.
> Everything works fine until I receive a ScannerTimeoutException from HBase.
> Maybe my transformations get stuck or a GC pause happen - hard to tell.
> The HBase Client restarts the scan and the processing continues.
> Except one problem - every time I receive this Exception I observe a
> duplicate Result processing - the Result which was processed just before
> ScannerTimeoutException is thrown is processed twice.
>
> Is this expected behavior? Should I be prepared to handle it?
> And how should I handle it? Keeping track of all processed Results is not
> feasible in my case.
>
> Here is a simple job demonstrating an issue (HBase scan and RPC timeouts
> are set to 60 sec)
>
> Thank you!
>
> Best regards,
> Mark
>
>
>   public static void main(String[] args) throws Exception {
>     ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>
>     env.createInput(new Src())
>         .map(new Mapper())
>         .print();
>   }
>
>   private static class Mapper implements MapFunction<Tuple1<String>,
> String> {
>
>     private int cnt = 0;
>
>     @Override
>     public String map(Tuple1<String> value) throws Exception {
>       if (cnt++ % 2 == 0) {
>         Thread.sleep(120000);
>       }
>       return value.f0;
>     }
>
>   }
>
>   private static class Src extends
> AbstractTableInputFormat<Tuple1<String>> {
>
>     @Override
>     protected Scan getScanner() {
>       Scan scan = new Scan();
>       scan.setStartRow(getStartRow());
>       scan.setStopRow(getEndRow());
>       scan.setCaching(1);
>       scan.setCacheBlocks(false);
>       return scan;
>     }
>
>     @Override
>     protected String getTableName() {
>       return getTable();
>     }
>
>     @Override
>     protected Tuple1<String> mapResultToOutType(Result r) {
>       return new Tuple1<String>(Bytes.toString(r.getRow()));
>     }
>
>     @Override
>     public void configure(org.apache.flink.configuration.Configuration
> parameters) {
>       scan = getScanner();
>       try {
>         table = new HTable(getHadoopConf(), getTableName());
>       } catch (IOException e) {
>         e.printStackTrace();
>       }
>     }
>
>   }
>

Reply via email to