Hi,

What do you mean by saying:

> When I open the logfiles on the Hadoop cluster I see this:


The error doesn’t come from Flink? Where do you execute 

hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));

?

To me it seems like it is a problem with misconfigured HBase and not something 
related to Flink.

Piotrek

> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl> wrote:
> 
> To facilitate you guys helping me I put this test project on github:
> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
> 
> Niels Basjes
> 
> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl 
> <mailto:ni...@basjes.nl>> wrote:
> Hi,
> 
> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn cluster 
> where I need to connect to HBase.
> 
> What I have:
> 
> In my environment:
> HADOOP_CONF_DIR=/etc/hadoop/conf/
> HBASE_CONF_DIR=/etc/hbase/conf/
> HIVE_CONF_DIR=/etc/hive/conf/
> YARN_CONF_DIR=/etc/hadoop/conf/
> 
> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
> hosts for HBase.
> 
> My test code is this:
> public class Main {
>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
> 
>   public static void main(String[] args) throws Exception {
>     printZookeeperConfig();
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>     env.createInput(new HBaseSource()).print();
>     env.execute("HBase config problem");
>   }
> 
>   public static void printZookeeperConfig() {
>     String zookeeper = 
> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>     LOG.info("----> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>   }
> 
>   public static class HBaseSource extends AbstractTableInputFormat<String> {
>     @Override
>     public void configure(org.apache.flink.configuration.Configuration 
> parameters) {
>       table = createTable();
>       if (table != null) {
>         scan = getScanner();
>       }
>     }
> 
>     private HTable createTable() {
>       LOG.info("Initializing HBaseConfiguration");
>       // Uses files found in the classpath
>       org.apache.hadoop.conf.Configuration hConf = 
> HBaseConfiguration.create();
>       printZookeeperConfig();
> 
>       try {
>         return new HTable(hConf, getTableName());
>       } catch (Exception e) {
>         LOG.error("Error instantiating a new HTable instance", e);
>       }
>       return null;
>     }
> 
>     @Override
>     public String getTableName() {
>       return "bugs:flink";
>     }
> 
>     @Override
>     protected String mapResultToOutType(Result result) {
>       return new 
> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
>     }
> 
>     @Override
>     protected Scan getScanner() {
>       return new Scan();
>     }
>   }
> 
> }
> 
> I run this application with this command on my Yarn cluster (note: first 
> starting a yarn-cluster and then submitting the job yields the same result).
> 
> flink \
>     run \
>     -m yarn-cluster \
>     --yarncontainer 1 \
>     --yarnname "Flink on Yarn HBase problem" \
>     --yarnslots                     1     \
>     --yarnjobManagerMemory          4000  \
>     --yarntaskManagerMemory         4000  \
>     --yarnstreaming                       \
>     target/flink-hbase-connect-1.0-SNAPSHOT.jar
> 
> Now in the client side logfile 
> /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see 
> 1) Classpath actually contains /etc/hbase/conf/ both near the start and at 
> the end.
> 2) The zookeeper settings of my experimental environent have been picked up 
> by the software
> 2017-10-20 11:17:23,973 INFO  com.bol.bugreports.Main                         
>               - ----> Loading HBaseConfiguration: Zookeeper = 
> node1.kluster.local.nl.bol.com:2181 
> <http://node1.kluster.local.nl.bol.com:2181/>,node2.kluster.local.nl.bol.com:2181
>  
> <http://node2.kluster.local.nl.bol.com:2181/>,node3.kluster.local.nl.bol.com:2181
>  <http://node3.kluster.local.nl.bol.com:2181/>
> 
> When I open the logfiles on the Hadoop cluster I see this:
> 
> 2017-10-20 13:17:33,250 INFO  com.bol.bugreports.Main                         
>               - ----> Loading HBaseConfiguration: Zookeeper = localhost
> 
> and as a consequence
> 
> 2017-10-20 13:17:33,368 INFO  org.apache.zookeeper.ClientCnxn                 
>               - Opening socket connection to server 
> localhost.localdomain/127.0.0.1:2181 <http://127.0.0.1:2181/>
> 2017-10-20 13:17:33,369 WARN  org.apache.zookeeper.ClientCnxn                 
>               - Session 0x0 for server null, unexpected error, closing socket 
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>       at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>       at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> 2017-10-20 13:17:33,475 WARN  
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper        - Possibly 
> transient ZooKeeper, quorum=localhost:2181, 
> exception=org.apache.zookeeper.KeeperException$ConnectionLossException: 
> KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
> 
> 
> The value 'localhost:2181' has been defined within the HBase jar in the 
> hbase-default.xml as the default value for the zookeeper nodes.
> 
> As a workaround I currently put this extra line in my code which I know is 
> nasty but "works on my cluster"
> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
> 
> What am I doing wrong?
> 
> What is the right way to fix this?
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes

Reply via email to