[ https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728777#comment-14728777 ]
ASF GitHub Bot commented on FLINK-2373: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1066#discussion_r38629325 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.javaApiOperators; + +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@SuppressWarnings("serial") +public class RemoteEnvironmentITCase { + + private static final int TM_SLOTS = 4; + + private static final int NUM_TM = 1; + + private static final int USER_DOP = 2; + + private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms"; + + private static final String VALID_STARTUP_TIMEOUT = "100 s"; + + private static ForkableFlinkMiniCluster cluster; + + @BeforeClass + public static void setupCluster() { + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); + cluster = new ForkableFlinkMiniCluster(config, false); + cluster.start(); + } + catch (Exception e) { + e.printStackTrace(); + fail("Error starting test cluster: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDownCluster() { + try { + cluster.stop(); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Cluster shutdown caused an exception: " + t.getMessage()); + } + } + + /** + * Ensure that that Akka configuration parameters can be set. + */ + @Test(expected=IllegalArgumentException.class) + public void testInvalidAkkaConfiguration() throws Throwable { + Configuration config = new Configuration(); + config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + cluster.hostname(), + cluster.getLeaderRPCPort(), + config + ); + env.getConfig().disableSysoutLogging(); + + DataSet<String> result = env.createInput(new TestNonRichInputFormat()); + result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>())); + try { + env.execute(); --- End diff -- Sorry, you're completely right. My bad. On Wed, Sep 2, 2015 at 6:54 PM, Andreas Kunft <notificati...@github.com> wrote: > In > flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java > <https://github.com/apache/flink/pull/1066#discussion_r38556479>: > > > + @Test(expected=IllegalArgumentException.class) > > + public void testInvalidAkkaConfiguration() throws Throwable { > > + Configuration config = new Configuration(); > > + config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); > > + > > + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( > > + cluster.hostname(), > > + cluster.getLeaderRPCPort(), > > + config > > + ); > > + env.getConfig().disableSysoutLogging(); > > + > > + DataSet<String> result = env.createInput(new TestNonRichInputFormat()); > > + result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>())); > > + try { > > + env.execute(); > > It would still fail, as no exception is thrown. But I can generate a nicer > error message. Will fix that. > > — > Reply to this email directly or view it on GitHub > <https://github.com/apache/flink/pull/1066/files#r38556479>. > > Add configuration parameter to createRemoteEnvironment method > ------------------------------------------------------------- > > Key: FLINK-2373 > URL: https://issues.apache.org/jira/browse/FLINK-2373 > Project: Flink > Issue Type: Bug > Components: other > Reporter: Andreas Kunft > Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Currently there is no way to provide a custom configuration upon creation of > a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)). > This leads to errors when the submitted job exceeds the default value for the > max. payload size in Akka, as we can not increase the configuration value > (akka.remote.OversizedPayloadException: Discarding oversized payload...) > Providing an overloaded method with a configuration parameter for the remote > environment fixes that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)