[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837096#comment-15837096 ]
ASF GitHub Bot commented on FLINK-2168: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97699488 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -22,54 +22,63 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.sources.BatchTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Pair; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * Creates a table source that helps to scan data from an hbase table * * Note : the colNames are specified along with a familyName and they are seperated by a ':' * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name */ -public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> { +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource<Row> { private Configuration conf; private String tableName; - private byte[] rowKey; - private String[] colNames; - private TypeInformation<?>[] colTypes; + private HBaseTableSchema schema; + private String[] famNames; - public HBaseTableSource(Configuration conf, String tableName, byte[] rowKey, String[] colNames, - TypeInformation<?>[] colTypes) { + public HBaseTableSource(Configuration conf, String tableName, HBaseTableSchema schema) { this.conf = conf; this.tableName = Preconditions.checkNotNull(tableName, "Table name"); - this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey"); - this.colNames = Preconditions.checkNotNull(colNames, "Field names"); - this.colTypes = Preconditions.checkNotNull(colTypes, "Field types"); + this.schema = Preconditions.checkNotNull(schema, "Schema"); + Map<String, List<Pair>> familyMap = schema.getFamilyMap(); + famNames = familyMap.keySet().toArray(new String[familyMap.size()]); --- End diff -- I would like to move these code into `getReturnType()`, the schema may change after construction of `HBaseTableSource`. > Add HBaseTableSource > -------------------- > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Affects Versions: 0.9 > Reporter: Fabian Hueske > Assignee: ramkrishna.s.vasudevan > Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)