Hi,

执行insert的时候需要在代码里等作业结束,可以参考这个util类的写法来做:
https://github.com/apache/flink/blob/release-1.11.1/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableEnvUtil.scala#L26

On Wed, Sep 9, 2020 at 2:01 PM zhongbaoluo <[email protected]> wrote:

> Options options = *null*;
>
>     *try* {
>
> OptionParser optionParser = *new* OptionParser(args);
>
> options = optionParser.getOptions();
>
> } *catch* (Exception e) {
>
> e.printStackTrace();
>
> *return*;
>
> }
>
>
>
>     String name            = options.getName();
>
>     String defaultDatabase = options.getDatabase();//"dc_yunpingtai";
>
>     String hiveConfDir     =  options.getHiveConfDir(); //"/Users/
> zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path
>
>     String version         = "3.1.2";
>
>     String sql = options.getSql();
>
>     HiveUtils.*hiveConfDir*(hiveConfDir);
>
>
>
>
>
>     HiveConf hiveConf = *new* HiveConf();
>
>     hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
> "thrift://dcmaster01:9083");
>
>     hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
> "/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster
>
>     hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_QUORUM*,
> "dcmaster02:2181");
>
>     hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_CLIENT_PORT*, "2181"
> );
>
>     hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_SESSION_TIMEOUT*,
> "10000");
>
>     hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2");
>
>     hiveConf.set("hive.server2.zookeeper.publish.configs", "true");
>
>     hiveConf.set("hive.server2.support.dynamic.service.discovery", "true"
> );
>
>     hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");
>
>
>
>     *try* {
>
>     EnvironmentSettings settings = EnvironmentSettings.*newInstance*
> ().inBatchMode().build();
>
>     System.*out*.println("settings  创建完成");
>
>         TableEnvironment tableEnv = TableEnvironment.*create*(settings);
>
>         System.*out*.println("tableEnv  创建完成");
>
>
>
>         MyHiveCatalog hive = *new* MyHiveCatalog(name, defaultDatabase,
> hiveConf,version);
>
>         tableEnv.registerCatalog(name, hive);
>
>         System.*out*.println("hive  创建完成");
>
>
>         // set the HiveCatalog as the current catalog of the session
>
>         tableEnv.useCatalog(name);
>
>         tableEnv.useDatabase(defaultDatabase);
>
>         tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*);
>
>         tableEnv.executeSql("show tables").print();
>
>         System.*out*.println("sql:"+sql);
>
>         //tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>
>
>
>         tableEnv.executeSql("DROP TABLE print_table");
>
>         tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH
> ('connector' = 'print')");
>
> //        tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" +
>
> //        "f0 BIGINT\n" +
>
> //        ") WITH ('connector' = 'jdbc',\n" +
>
> //        "'url' = 'jdbc:mysql://192.168.50.120:3306/datacloud
> ?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useTimezone=true',\n"
> +
>
> //        "'table-name' = 't_dc_test',\n" +
>
> //        "'username' = 'dcuser',\n" +
>
> //        "'password' = 'datacloud37')");
>
>         tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table
> select count(1) from t_mpos_integral_sign_water
>
>         //Table result = tableEnv.sqlQuery(sql);//"select count(1) from
> t_mpos_integral_sign_water"
>
>         System.*out*.println("tableResult  创建完成");
>
>         //result.execute().print();
>
> } *catch* (Exception e) {
>
> e.printStackTrace();
>
> }
>
>
> MyHiveCatalog 类的代码:
>
>
> *public* *class* MyHiveCatalog *extends* HiveCatalog{
>
> *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MyHiveCatalog.*class*);
>
> *public* MyHiveCatalog(String catalogName, @Nullable String
> defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) {
>
> *this*(catalogName,defaultDatabase == *null* ? *DEFAULT_DB* :
> defaultDatabase,*createHiveConf*(hiveConf),hiveVersion,*false*);
>
> }
>
> *protected* MyHiveCatalog(String catalogName, String defaultDatabase,
> HiveConf hiveConf, String hiveVersion,
>
> *boolean* allowEmbedded) {
>
> *super*(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded
> );
>
> // *TODO* Auto-generated constructor stub
>
> }
>
> *private* *static* HiveConf createHiveConf(@Nullable HiveConf hiveConf) {
>
> //LOG.info("Setting hive conf dir as {}", hiveConfDir);
>
>
> // try {
>
> // HiveConf.setHiveSiteLocation(
>
> // hiveConfDir == null ?
>
> // null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
>
> // } catch (MalformedURLException e) {
>
> // throw new CatalogException(
>
> // String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
>
> // }
>
>
> // create HiveConf from hadoop configuration
>
> Configuration hadoopConf = HadoopUtils.*getHadoopConfiguration*(*new*
> org.apache.flink.configuration.Configuration());
>
>
> // Add mapred-site.xml. We need to read configurations like compression
> codec.
>
> *for* (String possibleHadoopConfPath : HadoopUtils.
> *possibleHadoopConfPaths*(*new*
> org.apache.flink.configuration.Configuration())) {
>
> File mapredSite = *new* File(*new* File(possibleHadoopConfPath),
> "mapred-site.xml");
>
> *if* (mapredSite.exists()) {
>
> hadoopConf.addResource(*new* Path(mapredSite.getAbsolutePath()));
>
> *break*;
>
> }
>
> }
>
> HiveConf conf = *new* HiveConf(hadoopConf, HiveConf.*class*);
>
> conf.addResource(hiveConf);
>
> *return* conf;
>
> }
>
>
> }
>
> **********************************************************************
> Thanks & Best Regards!
>
> 杉欣集团-技术研究院  云平台
> 钟保罗
>
> 上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
> email: [email protected]
> 手机: 18157855633
>
>
>
>  原始邮件
> *发件人:* taochanglian<[email protected]>
> *收件人:* user-zh<[email protected]>; zhongbaoluo<
> [email protected]>
> *发送时间:* 2020年9月8日(周二) 16:51
> *主题:* Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询
>
> 贴一下代码
> 在 2020/9/8 14:09, zhongbaoluo 写道:
>
> 据插入数据执行失败,也没有找到异常。 yarn
>
>
>

-- 
Best regards!
Rui Li

回复