Till do you have some idea what is going on? I do not see any meaningful difference between Niels code and HBaseWriteStreamExample.java. There is also a very similar issue on mailing list as well: “Flink can't read hdfs namenode logical url”
Piotrek > On 22 Oct 2017, at 12:56, Niels Basjes <ni...@basjes.nl> wrote: > > HI, > > Yes, on all nodes the the same /etc/hbase/conf/hbase-site.xml that contains > the correct settings for hbase to find zookeeper. > That is why adding that files as an additional resource to the configuration > works. > I have created a very simple project that reproduces the problem on my setup: > https://github.com/nielsbasjes/FlinkHBaseConnectProblem > <https://github.com/nielsbasjes/FlinkHBaseConnectProblem> > > Niels Basjes > > > On Fri, Oct 20, 2017 at 6:54 PM, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > Is this /etc/hbase/conf/hbase-site.xml file is present on all of the > machines? If yes, could you share your code? > >> On 20 Oct 2017, at 16:29, Niels Basjes <ni...@basjes.nl >> <mailto:ni...@basjes.nl>> wrote: >> >> I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually >> looking in the jobmanager.log of the container running the Flink task. >> That is where I was able to find these messages . >> >> I do the >> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml")); >> in all places directly after the HBaseConfiguration.create(); >> That way I simply force the task to look on the actual Hadoop node for the >> same file it already loaded locally. >> >> The reason I'm suspecting Flink is because the clientside part of the Flink >> application does have the right setting and the task/job actually running in >> the cluster does not have the same settings. >> So it seems in the transition into the cluster the application does not copy >> everything it has available locally for some reason. >> >> There is a very high probability I did something wrong, I'm just not seeing >> it at this moment. >> >> Niels >> >> >> >> On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>> wrote: >> Hi, >> >> What do you mean by saying: >> >>> When I open the logfiles on the Hadoop cluster I see this: >> >> >> The error doesn’t come from Flink? Where do you execute >> >> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml")); >> >> ? >> >> To me it seems like it is a problem with misconfigured HBase and not >> something related to Flink. >> >> Piotrek >> >>> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl >>> <mailto:ni...@basjes.nl>> wrote: >>> >>> To facilitate you guys helping me I put this test project on github: >>> https://github.com/nielsbasjes/FlinkHBaseConnectProblem >>> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem> >>> >>> Niels Basjes >>> >>> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl >>> <mailto:ni...@basjes.nl>> wrote: >>> Hi, >>> >>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn >>> cluster where I need to connect to HBase. >>> >>> What I have: >>> >>> In my environment: >>> HADOOP_CONF_DIR=/etc/hadoop/conf/ >>> HBASE_CONF_DIR=/etc/hbase/conf/ >>> HIVE_CONF_DIR=/etc/hive/conf/ >>> YARN_CONF_DIR=/etc/hadoop/conf/ >>> >>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper >>> hosts for HBase. >>> >>> My test code is this: >>> public class Main { >>> private static final Logger LOG = LoggerFactory.getLogger(Main.class); >>> >>> public static void main(String[] args) throws Exception { >>> printZookeeperConfig(); >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); >>> env.createInput(new HBaseSource()).print(); >>> env.execute("HBase config problem"); >>> } >>> >>> public static void printZookeeperConfig() { >>> String zookeeper = >>> HBaseConfiguration.create().get("hbase.zookeeper.quorum"); >>> LOG.info("----> Loading HBaseConfiguration: Zookeeper = {}", zookeeper); >>> } >>> >>> public static class HBaseSource extends AbstractTableInputFormat<String> { >>> @Override >>> public void configure(org.apache.flink.configuration.Configuration >>> parameters) { >>> table = createTable(); >>> if (table != null) { >>> scan = getScanner(); >>> } >>> } >>> >>> private HTable createTable() { >>> LOG.info("Initializing HBaseConfiguration"); >>> // Uses files found in the classpath >>> org.apache.hadoop.conf.Configuration hConf = >>> HBaseConfiguration.create(); >>> printZookeeperConfig(); >>> >>> try { >>> return new HTable(hConf, getTableName()); >>> } catch (Exception e) { >>> LOG.error("Error instantiating a new HTable instance", e); >>> } >>> return null; >>> } >>> >>> @Override >>> public String getTableName() { >>> return "bugs:flink"; >>> } >>> >>> @Override >>> protected String mapResultToOutType(Result result) { >>> return new >>> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8))); >>> } >>> >>> @Override >>> protected Scan getScanner() { >>> return new Scan(); >>> } >>> } >>> >>> } >>> >>> I run this application with this command on my Yarn cluster (note: first >>> starting a yarn-cluster and then submitting the job yields the same result). >>> >>> flink \ >>> run \ >>> -m yarn-cluster \ >>> --yarncontainer 1 \ >>> --yarnname "Flink on Yarn HBase problem" \ >>> --yarnslots 1 \ >>> --yarnjobManagerMemory 4000 \ >>> --yarntaskManagerMemory 4000 \ >>> --yarnstreaming \ >>> target/flink-hbase-connect-1.0-SNAPSHOT.jar >>> >>> Now in the client side logfile >>> /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see >>> 1) Classpath actually contains /etc/hbase/conf/ both near the start and at >>> the end. >>> 2) The zookeeper settings of my experimental environent have been picked up >>> by the software >>> 2017-10-20 11:17:23,973 INFO com.bol.bugreports.Main >>> - ----> Loading HBaseConfiguration: Zookeeper = >>> node1.kluster.local.nl.bol.com:2181 >>> <http://node1.kluster.local.nl.bol.com:2181/>,node2.kluster.local.nl.bol.com:2181 >>> >>> <http://node2.kluster.local.nl.bol.com:2181/>,node3.kluster.local.nl.bol.com:2181 >>> <http://node3.kluster.local.nl.bol.com:2181/> >>> >>> When I open the logfiles on the Hadoop cluster I see this: >>> >>> 2017-10-20 13:17:33,250 INFO com.bol.bugreports.Main >>> - ----> Loading HBaseConfiguration: Zookeeper = localhost >>> >>> and as a consequence >>> >>> 2017-10-20 13:17:33,368 INFO org.apache.zookeeper.ClientCnxn >>> - Opening socket connection to server >>> localhost.localdomain/127.0.0.1:2181 <http://127.0.0.1:2181/> >>> 2017-10-20 13:17:33,369 WARN org.apache.zookeeper.ClientCnxn >>> - Session 0x0 for server null, unexpected error, closing >>> socket connection and attempting reconnect >>> java.net.ConnectException: Connection refused >>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >>> at >>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >>> at >>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) >>> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) >>> 2017-10-20 13:17:33,475 WARN >>> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper - Possibly >>> transient ZooKeeper, quorum=localhost:2181, >>> exception=org.apache.zookeeper.KeeperException$ConnectionLossException: >>> KeeperErrorCode = ConnectionLoss for /hbase/hbaseid >>> >>> >>> The value 'localhost:2181' has been defined within the HBase jar in the >>> hbase-default.xml as the default value for the zookeeper nodes. >>> >>> As a workaround I currently put this extra line in my code which I know is >>> nasty but "works on my cluster" >>> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml")); >>> >>> What am I doing wrong? >>> >>> What is the right way to fix this? >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >>> >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >> >> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes