https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html
 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html>
Here is my code:
my datastream source:
```
public static class MySource implements SourceFunction<UserInfo>{

    String userids[] = {
            "4760858d-2bec-483c-a535-291de04b2247", 
"67088699-d4f4-43f2-913c-481bff8a2dc5",
            "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
"dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
            "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
"3218bbb9-5874-4d37-a82d-3e35e52d1702",
            "3ebfb9602ac07779||3ebfe9612a007979", 
"aec20d52-c2eb-4436-b121-c29ad4097f6c",
            "e7e896cd939685d7||e7e8e6c1930689d7", 
"a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
    };

    @Override
    public void run(SourceContext<UserInfo> sourceContext) throws Exception{

        while (true){
            String userid = userids[(int) (Math.random() * (userids.length - 
1))];
            UserInfo userInfo = new UserInfo();
            userInfo.setUserId(userid);
            userInfo.setAmount(Math.random() * 100);
            userInfo.setTs(new Timestamp(System.currentTimeMillis()));
            sourceContext.collect(userInfo);
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel(){

    }
}

public static class UserInfo implements java.io.Serializable{
    private String userId;
    private Double amount;
    private Timestamp ts;

    public String getUserId(){
        return userId;
    }

    public void setUserId(String userId){
        this.userId = userId;
    }

    public Double getAmount(){
        return amount;
    }

    public void setAmount(Double amount){
        this.amount = amount;
    }

    public Timestamp getTs(){
        return ts;
    }

    public void setTs(Timestamp ts){
        this.ts = ts;
    }
}
```

flink code:
```
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())

//构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");

tEnv.createTemporaryView("users", dataStream);

String hiveSql = "CREATE external TABLE fs_table (\n" +
 "  user_id STRING,\n" +
 "  order_amount DOUBLE" +
 ") partitioned by (dt string,h string,m string) " +
 "stored as ORC " +
 "TBLPROPERTIES (\n" +
 "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
 "  'sink.partition-commit.delay'='0s',\n" +
 "  'sink.partition-commit.trigger'='partition-time',\n" +
 "  'sink.partition-commit.policy.kind'='metastore'" +
 ")";
tEnv.executeSql(hiveSql);

String insertSql = "SELECT * FROM users";
tEnv.executeSql(insertSql);
```

And this is my flink configuration:
```
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size 4096m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
```

And the exception is: java.util.concurrent.completionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailable and request 
to ResourceManager for new slot failed.

According the exception message, it means the resource is in sufficient, but 
the hadoop resource is enough, memory is 300+g, cores is 72, and the usage rate 
is lower about 30%.

I have tried increase the taskmanager slot at flink run command with `flink run 
-ys`, but it is not effective.

Here is the environment:
flink version: 1.12.0
java: 1.8

Please check what’s the problem is, really appreciate it. Thanks.

















Reply via email to