zhanglibing000 created FLINK-8244: ------------------------------------- Summary: There are two zookeeper client created when read data from hbase in the flink yarn session model Key: FLINK-8244 URL: https://issues.apache.org/jira/browse/FLINK-8244 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Affects Versions: 1.3.2 Reporter: zhanglibing000
I want to use flink HbaseTableInputFormat api to read data from hbase, and that is ok when I try to run my code in the local model, but when I try to run the code use the flink yarn session model,there some problems in task manager,and it does't has any error message,and no data output from the datasource ,finally I find the zookeeper client Create two times in the task manager log as attachment show,I think there has some problem in flink. thanks {code:java} import com.alibaba.fastjson.JSON import com.google.gson.Gson import com.mininglamp.shmetro.RuleEnginerStreamingFlinkApp.logger import com.mininglamp.shmetro.bean.json.JsonBeanGenerator import com.mininglamp.shmetro.model.TrainData import com.mininglamp.shmetro.output.GlobalEntity import com.mininglamp.shmetro.rules.{BatchRuleEngineerByScannerExecutor, RuleEngineerByRedisExecutor} import com.mininglamp.shmetro.source.HBaseTableInputFormat import com.mininglamp.shmetro.util.RedisUtil import com.mininglamp.shmetro.utils.{MysqlUtil} import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.operators.{DataSource, GroupReduceOperator, MapOperator} import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.api.java.utils.ParameterTool import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConversions._ object RuleEnginerBatchFlinkAppCopy { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.getConfig.setGlobalJobParameters(params) val appName = params.get("batch.name") val updateTime = params.getLong("update.time") val tableName = params.get("hbase.table.name") val columnFamily = params.get("hbase.table.cf") val columns = params.get("hbase.table.columns").split(",") val startTime = params.get("start.time") val endTime = params.get("end.time") val trainNo = params.get("train.no") val startRow = new StringBuilder startRow.append(trainNo).append("\0").append(startTime) val endRow = new StringBuilder endRow.append(trainNo).append("\0").append(endTime) val hBaseDataSource: DataSource[Tuple2[String, String]] = env.createInput(new HBaseTableInputFormat[Tuple2[String, String]](tableName, columnFamily, columns, null, startRow.toString(), endRow.toString()) { private val reuse = new Tuple2[String, String] override protected def mapResultToTuple(r: Result): Tuple2[String, String] = { logger.error("**********hbase row: " + reuse) val key = Bytes.toString(r.getRow) val value = getMapResult(r) reuse.setField(key, 0) val data = value.get(key) reuse.setField(JSON.toJSON(data).toString, 1) return reuse } }) hBaseDataSource.collect() } } {code} {code:java} import org.apache.commons.lang.StringUtils; import org.apache.flink.addons.hbase.AbstractTableInputFormat; import org.apache.flink.addons.hbase.TableInputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.util.Bytes; import java.util.HashMap; import java.util.List; import java.util.Map; public abstract class HBaseTableInputFormat<T extends Tuple> extends AbstractTableInputFormat<T> { private String tableName; private String columnFamily; private String[] columns; private String filterName; private String startRow; private String stopRow; public HBaseTableInputFormat(String tableName, String columnFamily, String[] columns, String filterName, String startRow, String stopRow) { this.tableName = tableName; this.columnFamily = columnFamily; this.columns = columns; this.filterName = filterName; this.startRow = startRow; this.stopRow = stopRow; } protected Scan getScanner() { Scan scan = new Scan(); if (!StringUtils.isEmpty(columnFamily)) { scan.addFamily(columnFamily.getBytes()); } if (!StringUtils.isEmpty(filterName)) { Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(filterName)); scan.setFilter(filter); } if (columns != null && !StringUtils.isEmpty(columnFamily)) { for (String column : columns) { scan.addColumn(columnFamily.getBytes(), column.getBytes()); } } if (!StringUtils.isEmpty(startRow)) { scan.setStartRow(startRow.getBytes()); } if (!StringUtils.isEmpty(stopRow)) { scan.setStopRow(stopRow.getBytes()); } return scan; } protected String getTableName() { return tableName; } protected abstract T mapResultToTuple(Result r); @Override public void configure(Configuration parameters) { table = createTable(); if (table != null) { scan = getScanner(); } } private HTable createTable() { LOG.info("Initializing HBaseConfiguration"); org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); configuration.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000); configuration.set("hbase.zookeeper.quorum", "x.x.x.x"); configuration.set("hbase.master.info.port", "2181"); configuration.set("hbase.master", "172.17.1.21:60000"); configuration.setInt("hbase.rpc.timeout", 20000); configuration.setInt("hbase.client.operation.timeout", 30000); try { return new HTable(configuration, getTableName()); } catch (Exception e) { LOG.error("Error instantiating a new HTable instance", e); } return null; } protected T mapResultToOutType(Result r) { return mapResultToTuple(r); } public Map<String, Map<String, String>> getMapResult(Result result) { Map<String, Map<String, String>> resMap = new HashMap<>(); Cell[] cells = result.rawCells(); Map<String, String> cellMap = new HashMap<>(); for (Cell cell : cells) { cellMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } resMap.put(Bytes.toString(result.getRow()), cellMap); return resMap; } } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)