https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html> Here is my code: my datastream source: ``` public static class MySource implements SourceFunction<UserInfo>{
String userids[] = { "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5", "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702", "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c", "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" }; @Override public void run(SourceContext<UserInfo> sourceContext) throws Exception{ while (true){ String userid = userids[(int) (Math.random() * (userids.length - 1))]; UserInfo userInfo = new UserInfo(); userInfo.setUserId(userid); userInfo.setAmount(Math.random() * 100); userInfo.setTs(new Timestamp(System.currentTimeMillis())); sourceContext.collect(userInfo); Thread.sleep(100); } } @Override public void cancel(){ } } public static class UserInfo implements java.io.Serializable{ private String userId; private Double amount; private Timestamp ts; public String getUserId(){ return userId; } public void setUserId(String userId){ this.userId = userId; } public Double getAmount(){ return amount; } public void setAmount(Double amount){ this.amount = amount; } public Timestamp getTs(){ return ts; } public void setTs(Timestamp ts){ this.ts = ts; } } ``` flink code: ``` StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(10000); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource()) //构造hive catalog String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/Users/user/work/hive/conf"; // a local path String version = "3.1.2"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.useDatabase("db1"); tEnv.createTemporaryView("users", dataStream); String hiveSql = "CREATE external TABLE fs_table (\n" + " user_id STRING,\n" + " order_amount DOUBLE" + ") partitioned by (dt string,h string,m string) " + "stored as ORC " + "TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + " 'sink.partition-commit.delay'='0s',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.policy.kind'='metastore'" + ")"; tEnv.executeSql(hiveSql); String insertSql = "SELECT * FROM users"; tEnv.executeSql(insertSql); ``` And this is my flink configuration: ``` jobmanager.memory.process.size: 1600m taskmanager.memory.process.size 4096m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 ``` And the exception is: java.util.concurrent.completionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailable and request to ResourceManager for new slot failed. According the exception message, it means the resource is in sufficient, but the hadoop resource is enough, memory is 300+g, cores is 72, and the usage rate is lower about 30%. I have tried increase the taskmanager slot at flink run command with `flink run -ys`, but it is not effective. Here is the environment: flink version: 1.12.0 java: 1.8 Please check what’s the problem is, really appreciate it. Thanks.