Hi Vinayak,

If `StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(2)` works for your case,
you could try as below.

`StreamExecutionEnvironment.setDefaultLocalParallelism(2);`
`StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();`

or

`StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();`
`env.setParallelism(2);`

That should be equivalent from the perspective of codes.


On Wed, Jul 31, 2019 at 2:51 PM Vinayak Magadum <magadumvina...@gmail.com>
wrote:

> Hi Andrey and Jeff,
>
> Thank you for the reply.
> I agree with Jeff. My concern is to use different code for local and
> non-local deployments.
> It would help if StreamExecutionEnvironment.getExecutionEnvironment()
> works for both local and cluster deployments.
> --------
> Thanks & Regards,
> Vinayak
>
>
> On Wed, Jul 31, 2019 at 7:02 AM Jeff Zhang <zjf...@gmail.com> wrote:
>
>> @Andrey,
>>
>> Although your approach will work, it requires the user to write different
>> code for local mode and other modes. This is inconvenient for users.
>> IMHO, we should not check these kinds of memory configuration in local
>> mode. Or implicitly set the memory of TM pretty large in local mode to
>> avoid this kind of problem.
>>
>> Andrey Zagrebin <and...@ververica.com> 于2019年7月31日周三 上午1:32写道:
>>
>>> Hi Vinayak,
>>>
>>> the error message provides a hint about changing config options, you
>>> could try to use StreamExecutionEnvironment.createLocalEnvironment(2,
>>> customConfig); to increase resources.
>>> this issue might also address the problem, it will be part of 1.9
>>> release:
>>> https://issues.apache.org/jira/browse/FLINK-12852
>>>
>>> Best,
>>> Andrey
>>>
>>> On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum <
>>> magadumvina...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using Flink version: 1.7.1
>>>>
>>>> I have a flink job that gets the execution environment as below and
>>>> executes the job.
>>>>
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>> When I run the code in cluster, it runs fine. But on local machine
>>>> while running the job via IntelliJ I get below error:
>>>>
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>>>> failed.
>>>>     at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>     at
>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>>>>     at
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>     <stack trace truncated >
>>>> Caused by: java.io.IOException: Insufficient number of network buffers:
>>>> required 8, but only 3 available. The total number of network buffers is
>>>> currently set to 12851 of 32768 bytes each. You can increase this number by
>>>> setting the configuration keys 'taskmanager.network.memory.fraction',
>>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>>>     at
>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
>>>>     at
>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
>>>>     at
>>>> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
>>>>     at
>>>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> Workaround that I tried to make it run on local is to use
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.createLocalEnvironment(2);
>>>>
>>>> instead of StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>> With Flink 1.4.2, StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both
>>>> cluster as well as local environment.
>>>>
>>>> Is there any way to make
>>>> StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster
>>>> and local mode in flink 1.7.1? Specifically how to make it work locally via
>>>> IntelliJ.
>>>> --------
>>>> Thanks & Regards,
>>>> Vinayak
>>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

Reply via email to