[ https://issues.apache.org/jira/browse/FLINK-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16052793#comment-16052793 ]
ASF GitHub Bot commented on FLINK-6783: --------------------------------------- Github user dawidwys closed the pull request at: https://github.com/apache/flink/pull/4089 > Wrongly extracted TypeInformations for WindowedStream::aggregate > ---------------------------------------------------------------- > > Key: FLINK-6783 > URL: https://issues.apache.org/jira/browse/FLINK-6783 > Project: Flink > Issue Type: Bug > Components: Core, DataStream API > Affects Versions: 1.3.0, 1.3.1 > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Priority: Blocker > Labels: flink-rel-1.3.1-blockers > Fix For: 1.3.1, 1.4.0 > > > The following test fails because of wrongly acquired output type for > {{AggregateFunction}}: > {code} > @Test > public void testAggregateWithWindowFunctionDifferentResultTypes() throws > Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Tuple2<String, Integer>> source = > env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); > DataStream<Tuple3<String, String, Integer>> window = source > .keyBy(new TupleKeySelector()) > .window(TumblingEventTimeWindows.of(Time.of(1, > TimeUnit.SECONDS))) > .aggregate(new AggregateFunction<Tuple2<String, Integer>, > Tuple2<String, Integer>, String>() { > @Override > public Tuple2<String, Integer> createAccumulator() { > return Tuple2.of("", 0); > } > @Override > public void add( > Tuple2<String, Integer> value, Tuple2<String, > Integer> accumulator) { > } > @Override > public String getResult(Tuple2<String, Integer> > accumulator) { > return accumulator.f0; > } > @Override > public Tuple2<String, Integer> merge( > Tuple2<String, Integer> a, Tuple2<String, > Integer> b) { > return Tuple2.of("", 0); > } > }, new WindowFunction<String, Tuple3<String, String, Integer>, > String, TimeWindow>() { > @Override > public void apply( > String s, > TimeWindow window, > Iterable<String> input, > Collector<Tuple3<String, String, Integer>> out) > throws Exception { > out.collect(Tuple3.of("", "", 0)); > } > }); > OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, > Integer>> transform = > (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, > String, Integer>>) window.getTransformation(); > OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, > Integer>> operator = transform.getOperator(); > Assert.assertTrue(operator instanceof WindowOperator); > WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = > (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) > operator; > Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); > Assert.assertTrue(winOperator.getWindowAssigner() instanceof > TumblingEventTimeWindows); > Assert.assertTrue(winOperator.getStateDescriptor() instanceof > AggregatingStateDescriptor); > processElementAndEnsureOutput( > operator, winOperator.getKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); > } > {code} > The test results in > {code} > org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: > Tuple type expected. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1157) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:451) > at > org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:855) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowTranslationTest.testAggregateWithWindowFunctionDifferentResultTypes(WindowTranslationTest.java:702) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple > type expected. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1204) > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1154) > ... 25 more > {code} > I tracked down the issue and the reason is wrongly handled > {{outputTypeArgumentIndex}} in {{TypeExtractor::getUnaryOperatorReturnType}}. > My proposition is to remove/deprecate version of > {{TypeExtractor::getUnaryOperatorReturnType}} that accepts {{hasIterable}} > and {{hasCollector}} as parameters and move all invocations to explicitly > passing index of output type (after fixing {{outputTypeArgumentIndex}} > handling in line {{TypeExtractor:455}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)