zhipeng93 commented on code in PR #215: URL: https://github.com/apache/flink-ml/pull/215#discussion_r1168127178
########## flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java: ########## @@ -152,41 +155,52 @@ private static <OUT> DataStream<OUT> cacheBroadcastVariables( } /** - * uses {@link DraftExecutionEnvironment} to execute the userDefinedFunction and returns the + * Uses {@link DraftExecutionEnvironment} to execute the userDefinedFunction and returns the * resultStream. * - * @param env execution environment. - * @param inputList non-broadcast input list. - * @param broadcastStreamNames names of the broadcast data streams. - * @param graphBuilder user-defined logic. - * @param <OUT> output type of the result stream. - * @return the result stream by applying user-defined logic on the input list. + * @param env Execution environment. + * @param inputList Non-broadcast input list. + * @param broadcastStreamNames Names of the broadcast data streams. + * @param graphBuilder User-defined logic. + * @param <OUT> Output type of the result stream. + * @return The result stream by applying user-defined logic on the input list. */ private static <OUT> DataStream<OUT> getResultStream( StreamExecutionEnvironment env, List<DataStream<?>> inputList, String[] broadcastStreamNames, Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) { - TypeInformation<?>[] inTypes = new TypeInformation[inputList.size()]; - for (int i = 0; i < inputList.size(); i++) { - inTypes[i] = inputList.get(i).getType(); - } - // do not block all non-broadcast input edges by default. - boolean[] isBlocked = new boolean[inputList.size()]; - Arrays.fill(isBlocked, false); + + // Executes the graph builder and gets real non-broadcast inputs. DraftExecutionEnvironment draftEnv = - new DraftExecutionEnvironment( - env, new BroadcastWrapper<>(broadcastStreamNames, inTypes, isBlocked)); + new DraftExecutionEnvironment(env, new DefaultWrapper<>()); List<DataStream<?>> draftSources = new ArrayList<>(); for (DataStream<?> dataStream : inputList) { draftSources.add(draftEnv.addDraftSource(dataStream, dataStream.getType())); } DataStream<OUT> draftOutStream = graphBuilder.apply(draftSources); - Preconditions.checkState( - draftEnv.getStreamGraph(false).getStreamNodes().size() == 1 + inputList.size(), - "cannot add more than one operator in withBroadcastStream's lambda function."); - draftEnv.copyToActualEnvironment(); - return draftEnv.getActualStream(draftOutStream.getId()); + + List<Transformation<?>> realNonBroadcastInputs = + draftOutStream.getTransformation().getInputs(); Review Comment: The `realNonBroadcastInputs` is not equivalent to the `inputList` here. For example, when calling `withBroadcastStreams()` with join as a udf (see [1]), the inputList are `source1` and `source2`, but the real non-broadcast input is a keyedStream generated following [2]. I have also updated the java doc of `withBroadcastStream` and explained that only the result stream can access the broadcast variables. [1] https://github.com/apache/flink-ml/pull/215/files#diff-92d87f6ba02ee90f721294e1d720d7a56b6b911ee8b5c2e00e0f0ce39fb21359R134 [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L388 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org