[ https://issues.apache.org/jira/browse/FLINK-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16052877#comment-16052877 ]
ASF GitHub Bot commented on FLINK-6783: --------------------------------------- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r122570865 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java --- @@ -161,6 +164,77 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T } /** + * Extracts type from given index from lambda. It supports nested types. + * + * @param exec lambda function to extract the type from + * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy + * @param paramLen count of total parameters of the lambda (including closure parameters) + * @param baseParametersLen count of lambda interface parameters (without closure parameters) + * @return extracted type + */ + public static Type extractTypeFromLambda( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } + + /** + * * This method extracts the n-th type argument from the given type. An InvalidTypesException + * is thrown if the type does not have any type arguments or if the index exceeds the number + * of type arguments. + * + * @param t Type to extract the type arguments from + * @param index Index of the type argument to extract + * @return The extracted type argument + * @throws InvalidTypesException if the given type does not have any type arguments or if the + * index exceeds the number of type arguments. + */ + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException { + if (t instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments(); + + if (index < 0 || index >= actualTypeArguments.length) { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + } else { + return actualTypeArguments[index]; + } + } else { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + } + } + + /** + * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object, + * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class. + * + * @param baseClass + * @throws InvalidTypesException if the given class does not implement + * @return + */ + public static Method getSingleAbstractMethod(Class<?> baseClass) { + Method sam = null; + for (Method method : baseClass.getMethods()) { + if (Modifier.isAbstract(method.getModifiers())) { + if (sam == null) { + sam = method; + } else { + throw new InvalidTypesException( + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM."); --- End diff -- This message seems to be inexact: if there is no SAM, sam would be null upon returning from the method. I suggest changing the message and adding a check (for null sam) prior to returning. > Wrongly extracted TypeInformations for WindowedStream::aggregate > ---------------------------------------------------------------- > > Key: FLINK-6783 > URL: https://issues.apache.org/jira/browse/FLINK-6783 > Project: Flink > Issue Type: Bug > Components: Core, DataStream API > Affects Versions: 1.3.0, 1.3.1 > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Priority: Blocker > Labels: flink-rel-1.3.1-blockers > Fix For: 1.3.1, 1.4.0 > > > The following test fails because of wrongly acquired output type for > {{AggregateFunction}}: > {code} > @Test > public void testAggregateWithWindowFunctionDifferentResultTypes() throws > Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Tuple2<String, Integer>> source = > env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); > DataStream<Tuple3<String, String, Integer>> window = source > .keyBy(new TupleKeySelector()) > .window(TumblingEventTimeWindows.of(Time.of(1, > TimeUnit.SECONDS))) > .aggregate(new AggregateFunction<Tuple2<String, Integer>, > Tuple2<String, Integer>, String>() { > @Override > public Tuple2<String, Integer> createAccumulator() { > return Tuple2.of("", 0); > } > @Override > public void add( > Tuple2<String, Integer> value, Tuple2<String, > Integer> accumulator) { > } > @Override > public String getResult(Tuple2<String, Integer> > accumulator) { > return accumulator.f0; > } > @Override > public Tuple2<String, Integer> merge( > Tuple2<String, Integer> a, Tuple2<String, > Integer> b) { > return Tuple2.of("", 0); > } > }, new WindowFunction<String, Tuple3<String, String, Integer>, > String, TimeWindow>() { > @Override > public void apply( > String s, > TimeWindow window, > Iterable<String> input, > Collector<Tuple3<String, String, Integer>> out) > throws Exception { > out.collect(Tuple3.of("", "", 0)); > } > }); > OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, > Integer>> transform = > (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, > String, Integer>>) window.getTransformation(); > OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, > Integer>> operator = transform.getOperator(); > Assert.assertTrue(operator instanceof WindowOperator); > WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = > (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) > operator; > Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); > Assert.assertTrue(winOperator.getWindowAssigner() instanceof > TumblingEventTimeWindows); > Assert.assertTrue(winOperator.getStateDescriptor() instanceof > AggregatingStateDescriptor); > processElementAndEnsureOutput( > operator, winOperator.getKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); > } > {code} > The test results in > {code} > org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: > Tuple type expected. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1157) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:451) > at > org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:855) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowTranslationTest.testAggregateWithWindowFunctionDifferentResultTypes(WindowTranslationTest.java:702) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple > type expected. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1204) > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1154) > ... 25 more > {code} > I tracked down the issue and the reason is wrongly handled > {{outputTypeArgumentIndex}} in {{TypeExtractor::getUnaryOperatorReturnType}}. > My proposition is to remove/deprecate version of > {{TypeExtractor::getUnaryOperatorReturnType}} that accepts {{hasIterable}} > and {{hasCollector}} as parameters and move all invocations to explicitly > passing index of output type (after fixing {{outputTypeArgumentIndex}} > handling in line {{TypeExtractor:455}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)