To facilitate you guys helping me I put this test project on github:
https://github.com/nielsbasjes/FlinkHBaseConnectProblem

Niels Basjes

On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
>
> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn
> cluster where I need to connect to HBase.
>
> What I have:
>
> In my environment:
> HADOOP_CONF_DIR=/etc/hadoop/conf/
> HBASE_CONF_DIR=/etc/hbase/conf/
> HIVE_CONF_DIR=/etc/hive/conf/
> YARN_CONF_DIR=/etc/hadoop/conf/
>
> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper
> hosts for HBase.
>
> My test code is this:
>
> public class Main {
>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>
>   public static void main(String[] args) throws Exception {
>     printZookeeperConfig();
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>     env.createInput(new HBaseSource()).print();
>     env.execute("HBase config problem");
>   }
>
>   public static void printZookeeperConfig() {
>     String zookeeper = 
> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>     LOG.info("----> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>   }
>
>   public static class HBaseSource extends AbstractTableInputFormat<String> {
>     @Override
>     public void configure(org.apache.flink.configuration.Configuration 
> parameters) {
>       table = createTable();
>       if (table != null) {
>         scan = getScanner();
>       }
>     }
>
>     private HTable createTable() {
>       LOG.info("Initializing HBaseConfiguration");
>       // Uses files found in the classpath
>       org.apache.hadoop.conf.Configuration hConf = 
> HBaseConfiguration.create();
>       printZookeeperConfig();
>
>       try {
>         return new HTable(hConf, getTableName());
>       } catch (Exception e) {
>         LOG.error("Error instantiating a new HTable instance", e);
>       }
>       return null;
>     }
>
>     @Override
>     public String getTableName() {
>       return "bugs:flink";
>     }
>
>     @Override
>     protected String mapResultToOutType(Result result) {
>       return new 
> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
>     }
>
>     @Override
>     protected Scan getScanner() {
>       return new Scan();
>     }
>   }
>
> }
>
>
> I run this application with this command on my Yarn cluster (note: first
> starting a yarn-cluster and then submitting the job yields the same result).
>
> flink \
>     run \
>     -m yarn-cluster \
>     --yarncontainer 1 \
>     --yarnname "Flink on Yarn HBase problem" \
>     --yarnslots                     1     \
>     --yarnjobManagerMemory          4000  \
>     --yarntaskManagerMemory         4000  \
>     --yarnstreaming                       \
>     target/flink-hbase-connect-1.0-SNAPSHOT.jar
>
> Now in the client side logfile 
> /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see
>
> 1) Classpath actually contains /etc/hbase/conf/ both near the start and at 
> the end.
>
> 2) The zookeeper settings of my experimental environent have been picked up 
> by the software
>
> 2017-10-20 11:17:23,973 INFO  com.bol.bugreports.Main                         
>               - ----> Loading HBaseConfiguration: Zookeeper = 
> node1.kluster.local.nl.bol.com:2181,node2.kluster.local.nl.bol.com:2181,node3.kluster.local.nl.bol.com:2181
>
>
> When I open the logfiles on the Hadoop cluster I see this:
>
> 2017-10-20 13:17:33,250 INFO  com.bol.bugreports.Main                         
>               - ----> Loading HBaseConfiguration: Zookeeper = *localhost*
>
>
> and as a consequence
>
> 2017-10-20 13:17:33,368 INFO  org.apache.zookeeper.ClientCnxn                 
>               - Opening socket connection to server 
> localhost.localdomain/127.0.0.1:2181
>
> 2017-10-20 13:17:33,369 WARN  org.apache.zookeeper.ClientCnxn                 
>               - Session 0x0 for server null, unexpected error, closing socket 
> connection and attempting reconnect
>
> java.net.ConnectException: Connection refused
>
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>
>       at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>
>       at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>
> 2017-10-20 13:17:33,475 WARN  
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper        - Possibly 
> transient ZooKeeper, quorum=localhost:2181, 
> exception=org.apache.zookeeper.KeeperException$ConnectionLossException: 
> KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
>
>
>
> The value 'localhost:2181' has been defined within the HBase jar in the
> hbase-default.xml as the default value for the zookeeper nodes.
>
> As a workaround I currently put this extra line in my code which I know is
> nasty but "works on my cluster"
>
> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>
>
> What am I doing wrong?
>
> What is the right way to fix this?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to