Hi Vinayak, If `StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2)` works for your case, you could try as below.
`StreamExecutionEnvironment.setDefaultLocalParallelism(2);` `StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();` or `StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();` `env.setParallelism(2);` That should be equivalent from the perspective of codes. On Wed, Jul 31, 2019 at 2:51 PM Vinayak Magadum <magadumvina...@gmail.com> wrote: > Hi Andrey and Jeff, > > Thank you for the reply. > I agree with Jeff. My concern is to use different code for local and > non-local deployments. > It would help if StreamExecutionEnvironment.getExecutionEnvironment() > works for both local and cluster deployments. > -------- > Thanks & Regards, > Vinayak > > > On Wed, Jul 31, 2019 at 7:02 AM Jeff Zhang <zjf...@gmail.com> wrote: > >> @Andrey, >> >> Although your approach will work, it requires the user to write different >> code for local mode and other modes. This is inconvenient for users. >> IMHO, we should not check these kinds of memory configuration in local >> mode. Or implicitly set the memory of TM pretty large in local mode to >> avoid this kind of problem. >> >> Andrey Zagrebin <and...@ververica.com> 于2019年7月31日周三 上午1:32写道: >> >>> Hi Vinayak, >>> >>> the error message provides a hint about changing config options, you >>> could try to use StreamExecutionEnvironment.createLocalEnvironment(2, >>> customConfig); to increase resources. >>> this issue might also address the problem, it will be part of 1.9 >>> release: >>> https://issues.apache.org/jira/browse/FLINK-12852 >>> >>> Best, >>> Andrey >>> >>> On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum < >>> magadumvina...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am using Flink version: 1.7.1 >>>> >>>> I have a flink job that gets the execution environment as below and >>>> executes the job. >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> When I run the code in cluster, it runs fine. But on local machine >>>> while running the job via IntelliJ I get below error: >>>> >>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>> failed. >>>> at >>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>> at >>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647) >>>> at >>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>>> <stack trace truncated > >>>> Caused by: java.io.IOException: Insufficient number of network buffers: >>>> required 8, but only 3 available. The total number of network buffers is >>>> currently set to 12851 of 32768 bytes each. You can increase this number by >>>> setting the configuration keys 'taskmanager.network.memory.fraction', >>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. >>>> at >>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272) >>>> at >>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257) >>>> at >>>> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278) >>>> at >>>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> Workaround that I tried to make it run on local is to use >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.createLocalEnvironment(2); >>>> >>>> instead of StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> With Flink 1.4.2, StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both >>>> cluster as well as local environment. >>>> >>>> Is there any way to make >>>> StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster >>>> and local mode in flink 1.7.1? Specifically how to make it work locally via >>>> IntelliJ. >>>> -------- >>>> Thanks & Regards, >>>> Vinayak >>>> >>> >> >> -- >> Best Regards >> >> Jeff Zhang >> >