Hi Chenzuoli,

The pictures are unfortunately not readable for me.

Cheers,
Till

On Wed, Apr 14, 2021 at 3:31 AM chenzuoli <chenzuoli...@163.com> wrote:

> Download full resolution images
> Available until May 14, 2021
>
> <https://www.icloud.com/attachment/?u=https%3A%2F%2Fcvws.icloud-content.com%2FB%2FAe5GSkj-YFuZTCZGaEu6GPihzx-7AdUBnM39Y4MbYWsiYk_NLx9zwJQQ%2F%24%7Bf%7D%3Fo%3DAlVY5QSsW5BX_oH9EoZOHG6QkCn05TIH8YH2miB0n_C5%26v%3D1%26x%3D3%26a%3DCAog36C0QwaK0BMfOhUFlqSplqI58eCIbiUgrnvfRrdbAn4SehDC8IDwjC8YwoD8w5YvIgEAKgkC6AMA_28vpaVSBKHPH7taBHPAlBBqJ2K-nETHh5awW3rtW5dSHzC3nDeIuqPUTZUvvcdtgOsNFwfHj7kUjHInHPbS8K9O0MrgZn-fA2aEEVj-bpqvfyInTOhEsY1j-6TEPySAjMS0%26e%3D1620955824%26fl%3D%26r%3D7464B4A1-AC88-496E-9608-6D9FEB86BFEA-1%26k%3D%24%7Buk%7D%26ckc%3Dcom.apple.largeattachment%26ckz%3DB0F3D90A-B949-4097-AD3F-ACE30FBE5D19%26p%3D30%26s%3Db1YldRmDMqM72hNP9DLBbOpRbOU&uk=Ir8SHK-FfM36NNDgmyiaYg&f=Images.zip&sz=42860073>Hi,
> sorry for the debug log, I can take the photos instead of the log text
> because of company policy, so if there is any log not clear, please tell
> me. Thanks.
>
>
>
>
>
>
>
>
>
> On Apr 12, 2021, at 16:06, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Chenzuoli,
>
> the exception says that Flink could not allocate enough slots. Could you
> share the DEBUG logs of the run with us. They should contain the reason why
> the allocation of further resources failed.
>
> Cheers,
> Till
>
> On Sun, Apr 11, 2021 at 5:59 AM chenzuoli <chenzuoli...@163.com> wrote:
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html
> <
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html
>
>
> Here is my code:
> my datastream source:
> ```
> public static class MySource implements SourceFunction<UserInfo>{
>
>    String userids[] = {
>            "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>            "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>            "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>            "3ebfb9602ac07779||3ebfe9612a007979",
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>            "e7e896cd939685d7||e7e8e6c1930689d7",
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
>    };
>
>    @Override
>    public void run(SourceContext<UserInfo> sourceContext) throws
> Exception{
>
>        while (true){
>            String userid = userids[(int) (Math.random() * (userids.length
> - 1))];
>            UserInfo userInfo = new UserInfo();
>            userInfo.setUserId(userid);
>            userInfo.setAmount(Math.random() * 100);
>            userInfo.setTs(new Timestamp(System.currentTimeMillis()));
>            sourceContext.collect(userInfo);
>            Thread.sleep(100);
>        }
>    }
>
>    @Override
>    public void cancel(){
>
>    }
> }
>
> public static class UserInfo implements java.io.Serializable{
>    private String userId;
>    private Double amount;
>    private Timestamp ts;
>
>    public String getUserId(){
>        return userId;
>    }
>
>    public void setUserId(String userId){
>        this.userId = userId;
>    }
>
>    public Double getAmount(){
>        return amount;
>    }
>
>    public void setAmount(Double amount){
>        this.amount = amount;
>    }
>
>    public Timestamp getTs(){
>        return ts;
>    }
>
>    public void setTs(Timestamp ts){
>        this.ts = ts;
>    }
> }
> ```
>
> flink code:
> ```
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.enableCheckpointing(10000);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
>
> //构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
> String version = "3.1.2";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tEnv.useDatabase("db1");
>
> tEnv.createTemporaryView("users", dataStream);
>
> String hiveSql = "CREATE external TABLE fs_table (\n" +
> "  user_id STRING,\n" +
> "  order_amount DOUBLE" +
> ") partitioned by (dt string,h string,m string) " +
> "stored as ORC " +
> "TBLPROPERTIES (\n" +
> "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> "  'sink.partition-commit.delay'='0s',\n" +
> "  'sink.partition-commit.trigger'='partition-time',\n" +
> "  'sink.partition-commit.policy.kind'='metastore'" +
> ")";
> tEnv.executeSql(hiveSql);
>
> String insertSql = "SELECT * FROM users";
> tEnv.executeSql(insertSql);
> ```
>
> And this is my flink configuration:
> ```
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size 4096m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
> ```
>
> And the exception is: java.util.concurrent.completionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailable and
> request to ResourceManager for new slot failed.
>
> According the exception message, it means the resource is in sufficient,
> but the hadoop resource is enough, memory is 300+g, cores is 72, and the
> usage rate is lower about 30%.
>
> I have tried increase the taskmanager slot at flink run command with
> `flink run -ys`, but it is not effective.
>
> Here is the environment:
> flink version: 1.12.0
> java: 1.8
>
> Please check what’s the problem is, really appreciate it. Thanks.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to