To facilitate you guys helping me I put this test project on github: https://github.com/nielsbasjes/FlinkHBaseConnectProblem
Niels Basjes On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn > cluster where I need to connect to HBase. > > What I have: > > In my environment: > HADOOP_CONF_DIR=/etc/hadoop/conf/ > HBASE_CONF_DIR=/etc/hbase/conf/ > HIVE_CONF_DIR=/etc/hive/conf/ > YARN_CONF_DIR=/etc/hadoop/conf/ > > In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper > hosts for HBase. > > My test code is this: > > public class Main { > private static final Logger LOG = LoggerFactory.getLogger(Main.class); > > public static void main(String[] args) throws Exception { > printZookeeperConfig(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); > env.createInput(new HBaseSource()).print(); > env.execute("HBase config problem"); > } > > public static void printZookeeperConfig() { > String zookeeper = > HBaseConfiguration.create().get("hbase.zookeeper.quorum"); > LOG.info("----> Loading HBaseConfiguration: Zookeeper = {}", zookeeper); > } > > public static class HBaseSource extends AbstractTableInputFormat<String> { > @Override > public void configure(org.apache.flink.configuration.Configuration > parameters) { > table = createTable(); > if (table != null) { > scan = getScanner(); > } > } > > private HTable createTable() { > LOG.info("Initializing HBaseConfiguration"); > // Uses files found in the classpath > org.apache.hadoop.conf.Configuration hConf = > HBaseConfiguration.create(); > printZookeeperConfig(); > > try { > return new HTable(hConf, getTableName()); > } catch (Exception e) { > LOG.error("Error instantiating a new HTable instance", e); > } > return null; > } > > @Override > public String getTableName() { > return "bugs:flink"; > } > > @Override > protected String mapResultToOutType(Result result) { > return new > String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8))); > } > > @Override > protected Scan getScanner() { > return new Scan(); > } > } > > } > > > I run this application with this command on my Yarn cluster (note: first > starting a yarn-cluster and then submitting the job yields the same result). > > flink \ > run \ > -m yarn-cluster \ > --yarncontainer 1 \ > --yarnname "Flink on Yarn HBase problem" \ > --yarnslots 1 \ > --yarnjobManagerMemory 4000 \ > --yarntaskManagerMemory 4000 \ > --yarnstreaming \ > target/flink-hbase-connect-1.0-SNAPSHOT.jar > > Now in the client side logfile > /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see > > 1) Classpath actually contains /etc/hbase/conf/ both near the start and at > the end. > > 2) The zookeeper settings of my experimental environent have been picked up > by the software > > 2017-10-20 11:17:23,973 INFO com.bol.bugreports.Main > - ----> Loading HBaseConfiguration: Zookeeper = > node1.kluster.local.nl.bol.com:2181,node2.kluster.local.nl.bol.com:2181,node3.kluster.local.nl.bol.com:2181 > > > When I open the logfiles on the Hadoop cluster I see this: > > 2017-10-20 13:17:33,250 INFO com.bol.bugreports.Main > - ----> Loading HBaseConfiguration: Zookeeper = *localhost* > > > and as a consequence > > 2017-10-20 13:17:33,368 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server > localhost.localdomain/127.0.0.1:2181 > > 2017-10-20 13:17:33,369 WARN org.apache.zookeeper.ClientCnxn > - Session 0x0 for server null, unexpected error, closing socket > connection and attempting reconnect > > java.net.ConnectException: Connection refused > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > > at > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) > > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) > > 2017-10-20 13:17:33,475 WARN > org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper - Possibly > transient ZooKeeper, quorum=localhost:2181, > exception=org.apache.zookeeper.KeeperException$ConnectionLossException: > KeeperErrorCode = ConnectionLoss for /hbase/hbaseid > > > > The value 'localhost:2181' has been defined within the HBase jar in the > hbase-default.xml as the default value for the zookeeper nodes. > > As a workaround I currently put this extra line in my code which I know is > nasty but "works on my cluster" > > hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml")); > > > What am I doing wrong? > > What is the right way to fix this? > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best regards / Met vriendelijke groeten, Niels Basjes