Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3149#discussion_r97701594
  
    --- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
    @@ -19,99 +19,113 @@
     package org.apache.flink.addons.hbase;
     
     import org.apache.flink.api.common.io.InputFormat;
    -import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
    -import org.apache.flink.api.common.io.RichInputFormat;
    -import org.apache.flink.api.common.io.statistics.BaseStatistics;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
     import org.apache.flink.api.java.typeutils.RowTypeInfo;
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.core.io.InputSplitAssigner;
     import org.apache.flink.types.Row;
     import org.apache.hadoop.hbase.HBaseConfiguration;
     import org.apache.hadoop.hbase.TableName;
     import org.apache.hadoop.hbase.TableNotFoundException;
    -import org.apache.hadoop.hbase.client.Scan;
    -import org.apache.hadoop.hbase.client.Table;
    -import org.apache.hadoop.hbase.client.ClusterConnection;
    -import org.apache.hadoop.hbase.client.Result;
    -import org.apache.hadoop.hbase.client.ResultScanner;
    -import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.HTable;
     import org.apache.hadoop.hbase.client.Connection;
    -import org.apache.hadoop.hbase.client.HRegionLocator;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
     import org.apache.hadoop.hbase.util.Bytes;
     import org.apache.hadoop.hbase.util.Pair;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.math.BigDecimal;
    -import java.math.BigInteger;
     import java.util.ArrayList;
    -import java.util.Date;
     import java.util.List;
    +import java.util.Map;
     
     /**
      * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
      */
    -public class HBaseTableSourceInputFormat extends RichInputFormat<Row, 
TableInputSplit> implements ResultTypeQueryable<Row> {
    +public class HBaseTableSourceInputFormat extends TableInputFormat<Row> 
implements ResultTypeQueryable<Row> {
     
        private static final long serialVersionUID = 1L;
     
        private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
        private String tableName;
    -   private TypeInformation[] fieldTypeInfos;
    -   private String[] fieldNames;
    -   private transient Table table;
    -   private transient Scan scan;
        private transient Connection conn;
    -   private ResultScanner resultScanner = null;
    -
    -   private byte[] lastRow;
    -   private int scannedRows;
    -   private boolean endReached = false;
    -   private org.apache.hadoop.conf.Configuration conf;
    -   private static final String COLON = ":";
    +   private transient org.apache.hadoop.conf.Configuration conf;
    +   private HBaseTableSchema schema;
     
    -   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
    -           this.conf = conf;
    +   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
                this.tableName = tableName;
    -           this.fieldNames = fieldNames;
    -           this.fieldTypeInfos = fieldTypeInfos;
    +           this.conf = conf;
    +           this.schema = schema;
        }
     
        @Override
        public void configure(Configuration parameters) {
                LOG.info("Initializing HBaseConfiguration");
                connectToTable();
                if(table != null) {
    -                   scan = createScanner();
    +                   scan = getScanner();
                }
        }
     
    -   private Scan createScanner() {
    +   @Override
    +   protected Scan getScanner() {
    +           // TODO : Pass 'rowkey'. For this we need FilterableTableSource
                Scan scan = new Scan();
    -           for(String field : fieldNames) {
    +           Map<String, List<Pair>> familyMap = schema.getFamilyMap();
    +           for(String family : familyMap.keySet()) {
                        // select only the fields in the 'selectedFields'
    -                   String[] famCol = field.split(COLON);
    -                   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
    +                   List<Pair> colDetails = familyMap.get(family);
    +                   for(Pair<String, TypeInformation<?>> pair : colDetails) 
{
    +                           scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
    +                   }
                }
                return scan;
        }
     
    +   @Override
    +   public String getTableName() {
    +           return tableName;
    +   }
    +
    +   @Override
    +   protected Row mapResultToTuple(Result res) {
    +           List<Object> values = new ArrayList<Object>();
    +           int i = 0;
    +           Map<String, List<Pair>> familyMap = schema.getFamilyMap();
    +           Row[] rows = new Row[familyMap.size()];
    --- End diff --
    
    Better to declare `rows` as `Object[]` to avoid confusing whether `rows` is 
a varargs or non-varargs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to