Hi Chenzuoli,

the exception says that Flink could not allocate enough slots. Could you
share the DEBUG logs of the run with us. They should contain the reason why
the allocation of further resources failed.

Cheers,
Till

On Sun, Apr 11, 2021 at 5:59 AM chenzuoli <chenzuoli...@163.com> wrote:

>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html
> >
> Here is my code:
> my datastream source:
> ```
> public static class MySource implements SourceFunction<UserInfo>{
>
>     String userids[] = {
>             "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>             "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>             "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>             "3ebfb9602ac07779||3ebfe9612a007979",
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>             "e7e896cd939685d7||e7e8e6c1930689d7",
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
>     };
>
>     @Override
>     public void run(SourceContext<UserInfo> sourceContext) throws
> Exception{
>
>         while (true){
>             String userid = userids[(int) (Math.random() * (userids.length
> - 1))];
>             UserInfo userInfo = new UserInfo();
>             userInfo.setUserId(userid);
>             userInfo.setAmount(Math.random() * 100);
>             userInfo.setTs(new Timestamp(System.currentTimeMillis()));
>             sourceContext.collect(userInfo);
>             Thread.sleep(100);
>         }
>     }
>
>     @Override
>     public void cancel(){
>
>     }
> }
>
> public static class UserInfo implements java.io.Serializable{
>     private String userId;
>     private Double amount;
>     private Timestamp ts;
>
>     public String getUserId(){
>         return userId;
>     }
>
>     public void setUserId(String userId){
>         this.userId = userId;
>     }
>
>     public Double getAmount(){
>         return amount;
>     }
>
>     public void setAmount(Double amount){
>         this.amount = amount;
>     }
>
>     public Timestamp getTs(){
>         return ts;
>     }
>
>     public void setTs(Timestamp ts){
>         this.ts = ts;
>     }
> }
> ```
>
> flink code:
> ```
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.enableCheckpointing(10000);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
>
> //构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
> String version = "3.1.2";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tEnv.useDatabase("db1");
>
> tEnv.createTemporaryView("users", dataStream);
>
> String hiveSql = "CREATE external TABLE fs_table (\n" +
>  "  user_id STRING,\n" +
>  "  order_amount DOUBLE" +
>  ") partitioned by (dt string,h string,m string) " +
>  "stored as ORC " +
>  "TBLPROPERTIES (\n" +
>  "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
>  "  'sink.partition-commit.delay'='0s',\n" +
>  "  'sink.partition-commit.trigger'='partition-time',\n" +
>  "  'sink.partition-commit.policy.kind'='metastore'" +
>  ")";
> tEnv.executeSql(hiveSql);
>
> String insertSql = "SELECT * FROM users";
> tEnv.executeSql(insertSql);
> ```
>
> And this is my flink configuration:
> ```
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size 4096m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
> ```
>
> And the exception is: java.util.concurrent.completionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailable and
> request to ResourceManager for new slot failed.
>
> According the exception message, it means the resource is in sufficient,
> but the hadoop resource is enough, memory is 300+g, cores is 72, and the
> usage rate is lower about 30%.
>
> I have tried increase the taskmanager slot at flink run command with
> `flink run -ys`, but it is not effective.
>
> Here is the environment:
> flink version: 1.12.0
> java: 1.8
>
> Please check what’s the problem is, really appreciate it. Thanks.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to