Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97701594 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -19,99 +19,113 @@ package org.apache.flink.addons.hbase; import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.types.Row; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; import java.util.ArrayList; -import java.util.Date; import java.util.List; +import java.util.Map; /** * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} */ -public class HBaseTableSourceInputFormat extends RichInputFormat<Row, TableInputSplit> implements ResultTypeQueryable<Row> { +public class HBaseTableSourceInputFormat extends TableInputFormat<Row> implements ResultTypeQueryable<Row> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); private String tableName; - private TypeInformation[] fieldTypeInfos; - private String[] fieldNames; - private transient Table table; - private transient Scan scan; private transient Connection conn; - private ResultScanner resultScanner = null; - - private byte[] lastRow; - private int scannedRows; - private boolean endReached = false; - private org.apache.hadoop.conf.Configuration conf; - private static final String COLON = ":"; + private transient org.apache.hadoop.conf.Configuration conf; + private HBaseTableSchema schema; - public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) { - this.conf = conf; + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) { this.tableName = tableName; - this.fieldNames = fieldNames; - this.fieldTypeInfos = fieldTypeInfos; + this.conf = conf; + this.schema = schema; } @Override public void configure(Configuration parameters) { LOG.info("Initializing HBaseConfiguration"); connectToTable(); if(table != null) { - scan = createScanner(); + scan = getScanner(); } } - private Scan createScanner() { + @Override + protected Scan getScanner() { + // TODO : Pass 'rowkey'. For this we need FilterableTableSource Scan scan = new Scan(); - for(String field : fieldNames) { + Map<String, List<Pair>> familyMap = schema.getFamilyMap(); + for(String family : familyMap.keySet()) { // select only the fields in the 'selectedFields' - String[] famCol = field.split(COLON); - scan.addColumn(Bytes.toBytes(famCol[0]), Bytes.toBytes(famCol[1])); + List<Pair> colDetails = familyMap.get(family); + for(Pair<String, TypeInformation<?>> pair : colDetails) { + scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst())); + } } return scan; } + @Override + public String getTableName() { + return tableName; + } + + @Override + protected Row mapResultToTuple(Result res) { + List<Object> values = new ArrayList<Object>(); + int i = 0; + Map<String, List<Pair>> familyMap = schema.getFamilyMap(); + Row[] rows = new Row[familyMap.size()]; --- End diff -- Better to declare `rows` as `Object[]` to avoid confusing whether `rows` is a varargs or non-varargs.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---