Options options = null;
try {
OptionParser optionParser = new OptionParser(args);
options = optionParser.getOptions();
} catch (Exception e) {
e.printStackTrace();
return;
}
String name = options.getName();
String defaultDatabase = options.getDatabase();//"dc_yunpingtai";
String hiveConfDir = options.getHiveConfDir();
//"/Users/zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path
String version = "3.1.2";
String sql = options.getSql();
HiveUtils.hiveConfDir(hiveConfDir);
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://dcmaster01:9083");
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster
hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "dcmaster02:2181");
hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "2181");
hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, "10000");
hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2");
hiveConf.set("hive.server2.zookeeper.publish.configs", "true");
hiveConf.set("hive.server2.support.dynamic.service.discovery", "true");
hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");
try {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
System.out.println("settings 创建完成");
TableEnvironment tableEnv = TableEnvironment.create(settings);
System.out.println("tableEnv 创建完成");
MyHiveCatalog hive = new MyHiveCatalog(name, defaultDatabase,
hiveConf,version);
tableEnv.registerCatalog(name, hive);
System.out.println("hive 创建完成");
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name);
tableEnv.useDatabase(defaultDatabase);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("show tables").print();
System.out.println("sql:"+sql);
//tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("DROP TABLE print_table");
tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH
('connector' = 'print')");
// tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" +
// "f0 BIGINT\n" +
// ") WITH ('connector' = 'jdbc',\n" +
// "'url' =
'jdbc:mysql://192.168.50.120:3306/datacloud?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useTimezone=true',\n"
+
// "'table-name' = 't_dc_test',\n" +
// "'username' = 'dcuser',\n" +
// "'password' = 'datacloud37')");
tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table select
count(1) from t_mpos_integral_sign_water
//Table result = tableEnv.sqlQuery(sql);//"select count(1) from
t_mpos_integral_sign_water"
System.out.println("tableResult 创建完成");
//result.execute().print();
} catch (Exception e) {
e.printStackTrace();
}
MyHiveCatalog 类的代码:
public class MyHiveCatalog extends HiveCatalog{
private static final Logger LOG = LoggerFactory.getLogger(MyHiveCatalog.class);
public MyHiveCatalog(String catalogName, @Nullable String defaultDatabase,
@Nullable HiveConf hiveConf, String hiveVersion) {
this(catalogName,defaultDatabase == null ? DEFAULT_DB :
defaultDatabase,createHiveConf(hiveConf),hiveVersion,false);
}
protected MyHiveCatalog(String catalogName, String defaultDatabase, HiveConf
hiveConf, String hiveVersion,
boolean allowEmbedded) {
super(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded);
// TODO Auto-generated constructor stub
}
private static HiveConf createHiveConf(@Nullable HiveConf hiveConf) {
//LOG.info("Setting hive conf dir as {}", hiveConfDir);
// try {
// HiveConf.setHiveSiteLocation(
// hiveConfDir == null ?
// null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
// } catch (MalformedURLException e) {
// throw new CatalogException(
// String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
// }
// create HiveConf from hadoop configuration
Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new
org.apache.flink.configuration.Configuration());
// Add mapred-site.xml. We need to read configurations like compression codec.
for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new
org.apache.flink.configuration.Configuration())) {
File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml");
if (mapredSite.exists()) {
hadoopConf.addResource(new Path(mapredSite.getAbsolutePath()));
break;
}
}
HiveConf conf = new HiveConf(hadoopConf, HiveConf.class);
conf.addResource(hiveConf);
return conf;
}
}
**********************************************************************
Thanks & Best Regards!
杉欣集团-技术研究院 云平台
钟保罗
上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
email: [email protected]
手机: 18157855633
原始邮件
发件人: taochanglian<[email protected]>
收件人: user-zh<[email protected]>; zhongbaoluo<[email protected]>
发送时间: 2020年9月8日(周二) 16:51
主题: Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询
贴一下代码
在 2020/9/8 14:09, zhongbaoluo 写道:
据插入数据执行失败,也没有找到异常。 yarn