[ https://issues.apache.org/jira/browse/FLINK-7943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16234191#comment-16234191 ]
ASF GitHub Bot commented on FLINK-7943: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4921#discussion_r148286896 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java --- @@ -83,82 +82,100 @@ public void add(Option option) throws RequiredParametersException { * <p>If any check fails, a RequiredParametersException is thrown * * @param parameterTool - parameters supplied by the user. + * @return New ParameterTool instance with default values set * @throws RequiredParametersException if any of the specified checks fail */ - public void applyTo(ParameterTool parameterTool) throws RequiredParametersException { + public ParameterTool applyTo(ParameterTool parameterTool) throws RequiredParametersException { List<String> missingArguments = new LinkedList<>(); + + ParameterTool resultParameterTool = parameterTool; for (Option o : data.values()) { - if (parameterTool.data.containsKey(o.getName())) { - if (Objects.equals(parameterTool.data.get(o.getName()), ParameterTool.NO_VALUE_KEY)) { + if (resultParameterTool.has(o.getName())) { + if (Objects.equals(resultParameterTool.get(o.getName()), ParameterTool.NO_VALUE_KEY)) { // the parameter has been passed, but no value, check if there is a default value - checkAndApplyDefaultValue(o, parameterTool.data); + resultParameterTool = checkAndApplyDefaultValue(o, resultParameterTool); } else { // a value has been passed in the parameterTool, now check if it adheres to all constraints - checkAmbiguousValues(o, parameterTool.data); - checkIsCastableToDefinedType(o, parameterTool.data); - checkChoices(o, parameterTool.data); + checkAmbiguousValues(o, resultParameterTool); + checkIsCastableToDefinedType(o, resultParameterTool); + checkChoices(o, resultParameterTool); } } else { // check if there is a default name or a value passed for a possibly defined alternative name. - if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, parameterTool.data)) { + resultParameterTool = synchronizeAlternativeName(o, resultParameterTool); + + if (!resultParameterTool.has(o.getName()) || Objects.equals(resultParameterTool.get(o.getName()), ParameterTool.NO_VALUE_KEY)) { missingArguments.add(o.getName()); } } } if (!missingArguments.isEmpty()) { throw new RequiredParametersException(this.missingArgumentsText(missingArguments), missingArguments); } + + return resultParameterTool; } // check if the given parameter has a default value and add it to the passed map if that is the case // else throw an exception - private void checkAndApplyDefaultValue(Option o, Map<String, String> data) throws RequiredParametersException { - if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, data)) { + private ParameterTool checkAndApplyDefaultValue(Option o, ParameterTool parameterTool) throws RequiredParametersException { + final ParameterTool resultParameterTool = synchronizeAlternativeName(o, parameterTool); + + if (!resultParameterTool.has(o.getName()) || Objects.equals(resultParameterTool.get(o.getName()), ParameterTool.NO_VALUE_KEY)) { throw new RequiredParametersException("No default value for undefined parameter " + o.getName()); + } else { + return resultParameterTool; } } // check if the value in the given map which corresponds to the name of the given option // is castable to the type of the option (if any is defined) - private void checkIsCastableToDefinedType(Option o, Map<String, String> data) throws RequiredParametersException { - if (o.hasType() && !o.isCastableToDefinedType(data.get(o.getName()))) { + private void checkIsCastableToDefinedType(Option o, ParameterTool parameterTool) throws RequiredParametersException { + if (o.hasType() && !o.isCastableToDefinedType(parameterTool.get(o.getName()))) { throw new RequiredParametersException("Value for parameter " + o.getName() + " cannot be cast to type " + o.getType()); } } // check if the value in the given map which corresponds to the name of the given option // adheres to the list of given choices for the param in the options (if any are defined) - private void checkChoices(Option o, Map<String, String> data) throws RequiredParametersException { - if (o.getChoices().size() > 0 && !o.getChoices().contains(data.get(o.getName()))) { - throw new RequiredParametersException("Value " + data.get(o.getName()) + + private void checkChoices(Option o, ParameterTool parameterTool) throws RequiredParametersException { + if (o.getChoices().size() > 0 && !o.getChoices().contains(parameterTool.get(o.getName()))) { + throw new RequiredParametersException("Value " + parameterTool.get(o.getName()) + " is not in the list of valid choices for key " + o.getName()); } } // move value passed on alternative name to standard name or apply default value if any defined - // else return true to indicate parameter is 'really' missing - private boolean hasNoDefaultValueAndNoValuePassedOnAlternativeName(Option o, Map<String, String> data) - throws RequiredParametersException { - if (o.hasAlt() && data.containsKey(o.getAlt())) { - data.put(o.getName(), data.get(o.getAlt())); + // if any change was applied, then this method returns a new ParameterTool instance with these + // changes. If not, then the passed ParameterTool instance will be returned. + private ParameterTool synchronizeAlternativeName(Option o, ParameterTool parameterTool) { + // TODO: Throw this all away!!! + if (o.hasAlt() && parameterTool.has(o.getAlt())) { + HashMap<String, String> newData = new HashMap<>(parameterTool.toMap()); + newData.put(o.getName(), parameterTool.get(o.getAlt())); + + return ParameterTool.fromMap(newData); } else { if (o.hasDefaultValue()) { - data.put(o.getName(), o.getDefaultValue()); + HashMap<String, String> newData = new HashMap<>(parameterTool.toMap()); --- End diff -- could we not instead create a copy at the start of applyTo and modify that? > OptionalDataException when launching Flink jobs concurrently > ------------------------------------------------------------ > > Key: FLINK-7943 > URL: https://issues.apache.org/jira/browse/FLINK-7943 > Project: Flink > Issue Type: Bug > Components: Client > Affects Versions: 1.4.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > > A user reported that he is getting a {{OptionalDataException}} when he > launches multiple Flink jobs from the same program concurrently. The problem > seems to appear if one sets the {{GlobalJobParameters}}. The stack trace can > be found below: > {code} > Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15) > java.io.OptionalDataException > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at java.util.HashMap.readObject(HashMap.java:1407) > at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > The user code causing the problem is: > {code} > @SuppressWarnings("serial") > public class UnionThreaded { > static int ThreadPoolSize = 3; > static int JobsPerThread = 2; > static ParameterTool params; > public static class RunSubset implements Runnable { > private int start = 0; > private int end = 0; > RunSubset(int start, int end) { > this.start = start; > this.end = end; > } > @Override > public void run() { > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > if (params.has("left") && params.has("right")) { > for (int i = start; i < end; i++) { > DataSet<DeviceRecord> l, r; > DataSet<DeviceRecord1> j; > DataSet<Tuple2<Integer, Integer>> c1, c2; > r = env.readCsvFile(params.get("right") + "/" + > Integer.toString(i)) > .pojoType(DeviceRecord.class, "A", "B", "C") > .setParallelism(1) > .filter(new MyFilter()) > .setParallelism(1); > // read the text file from given input path > j = env.readCsvFile(params.get("left") + "/" + > Integer.toString(i)) > .pojoType(DeviceRecord.class, "A", "B", "C") > .setParallelism(1) > .leftOuterJoin(r) > .where("B") > .equalTo("B") > .with(new MyFlatJoinFunction()).setParallelism(1); > j.flatMap(new Mapper(false)) > .groupBy(0) > .sum(1).setParallelism(1) > .writeAsCsv(params.get("output") + "/" + > Integer.toString(i), "\n", ","); > j.flatMap(new Mapper2(true)) > .groupBy(0) > .sum(1).setParallelism(1) > .writeAsCsv(params.get("output2") + "/" + > Integer.toString(i), "\n", ","); > } > } > try { > System.out.println("calling env.execute()"); // + > Calendar.getInstance().getTime(); > env.execute("Union4a" + ":" + Integer.toString(start) + ":" + > Integer.toString(end)); > } catch (Exception e) { > System.err.println("env.execute exception: " + > e.getMessage()); > } > } > } > // > ************************************************************************* > // PROGRAM > // > ************************************************************************* > public static void main(String[] args) throws Exception { > params = ParameterTool.fromArgs(args); > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > int total_to_do = Integer.decode(params.get("filecount")); > // number of threads should be <= number of slots > ThreadPoolExecutor executor = (ThreadPoolExecutor) > Executors.newFixedThreadPool(ThreadPoolSize); > // assumes an even number of jobs > for (int i = 0; i < total_to_do; i += JobsPerThread) { > int end = i + JobsPerThread; > if (end > total_to_do) { > end = total_to_do; > } > executor.execute(new RunSubset(i, end)); > } > executor.shutdown(); > // Many ways of waiting. > try { > executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); > } catch (InterruptedException e) { > System.out.println("Execution interrupted"); > System.exit(-1); > } > // get input data > DataSet<Tuple2<Integer, Integer>> counts; > DataSet<Tuple2<Integer, Integer>> counts2; > counts = env.readCsvFile(params.get("output")) > .types(Integer.class, Integer.class); > counts2 = env.readCsvFile(params.get("output2")) > .types(Integer.class, Integer.class); > // Count by C > counts = counts > .groupBy(0) > .sum(1); > // Count by device > counts2 = counts2 > .groupBy(0) > .sum(1); > // emit result > if (params.has("output")) { > counts.writeAsCsv(params.get("output3"), "\n", ", "); > } > // emit result > if (params.has("output2")) { > counts2.writeAsCsv(params.get("output4"), "\n", ", "); > } > // execute program > env.execute("Union4b"); > } > {code} > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)