[ 
https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960349#comment-15960349
 ] 

Luke Hutchison commented on FLINK-6276:
---------------------------------------

Looking at the flink code, the issue here is the type of {{join}} not being 
available when {{datasetIdx == 1}}.

Adding a type hint as follows:

{code}
            if (datasetIdx == 0) {
                join = datasets[datasetIdx] //
                        .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) //
                        .name("start join")
                        .returns(TypeInformation.of(new TypeHint<Tuple2<K, 
List<V>>>(){}));
            } else { /* ... */ }
{code}

causes the following exception to be thrown:

{noformat}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet 
com.rentlogic.buildingscores.flink.experimental.TestMain.join(java.lang.Object,org.apache.flink.api.java.DataSet[])'
 could not be determined. This is most likely a type erasure problem. The type 
extraction currently supports types with generic variables only in cases where 
all variables in the return type can be deduced from the input type(s).
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
        at 
org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:47)
        at 
com.rentlogic.buildingscores.flink.experimental.TestMain$1.<init>(TestMain.java:27)
        at 
com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
        at 
com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:77)
{noformat}

(Every time I have hit this problem in the past (at least five or six separate 
times), I have tried numerous alternatives, and hit a brick wall, so had to go 
back and redesign the pipeline so that it didn't use generics. I have no idea 
what the right solution to this is.)

> InvalidTypesException: Unknown Error. Type is null.
> ---------------------------------------------------
>
>                 Key: FLINK-6276
>                 URL: https://issues.apache.org/jira/browse/FLINK-6276
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, DataSet API
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> Quite frequently when writing Flink code, I get the exception 
> {{InvalidTypesException: Unknown Error. Type is null.}} 
> A small example that triggers it is:
> {code}
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> public class TestMain {
>     @SafeVarargs
>     public static <K, V> DataSet<Tuple2<K, List<V>>> join(V 
> missingValuePlaceholder,
>             DataSet<Tuple2<K, V>>... datasets) {
>         DataSet<Tuple2<K, List<V>>> join = null;
>         for (int i = 0; i < datasets.length; i++) {
>             final int datasetIdx = i;
>             if (datasetIdx == 0) {
>                 join = datasets[datasetIdx] //
>                         .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) //
>                         .name("start join");
>             } else {
>                 join = join.coGroup(datasets[datasetIdx]) //
>                         .where(0).equalTo(0) //
>                         .with((Iterable<Tuple2<K, List<V>>> li, 
> Iterable<Tuple2<K, V>> ri,
>                                 Collector<Tuple2<K, List<V>>> out) -> {
>                             K key = null;
>                             List<V> vals = new ArrayList<>(datasetIdx + 1);
>                             Iterator<Tuple2<K, List<V>>> lIter = 
> li.iterator();
>                             if (!lIter.hasNext()) {
>                                 for (int j = 0; j < datasetIdx; j++) {
>                                     vals.add(missingValuePlaceholder);
>                                 }
>                             } else {
>                                 Tuple2<K, List<V>> lt = lIter.next();
>                                 key = lt.f0;
>                                 vals.addAll(lt.f1);
>                                 if (lIter.hasNext()) {
>                                     throw new RuntimeException("Got 
> non-unique key: " + key);
>                                 }
>                             }
>                             Iterator<Tuple2<K, V>> rIter = ri.iterator();
>                             if (!rIter.hasNext()) {
>                                 vals.add(missingValuePlaceholder);
>                             } else {
>                                 Tuple2<K, V> rt = rIter.next();
>                                 key = rt.f0;
>                                 vals.add(rt.f1);
>                                 if (rIter.hasNext()) {
>                                     throw new RuntimeException("Got 
> non-unique key: " + key);
>                                 }
>                             }
>                             out.collect(new Tuple2<K, List<V>>(key, vals));
>                         }) //
>                         .name("join #" + datasetIdx);
>             }
>         }
>         return join;
>     }
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<Tuple2<String, Integer>> x = //
>                 env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), 
> new Tuple2<>("c", 5));
>         DataSet<Tuple2<String, Integer>> y = //
>                 env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), 
> new Tuple2<>("d", 2));
>         DataSet<Tuple2<String, Integer>> z = //
>                 env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), 
> new Tuple2<>("e", 9));
>         System.out.println(join(-1, x, y, z).collect());
>     }
> }
> {code}
> The stacktrace that is triggered is:
> {noformat}
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: The return type 
> of function 'join(TestMain.java:23)' could not be determined automatically, 
> due to type erasure. You can give type information hints by using the 
> returns(...) method on the result of the transformation call, or by letting 
> your function implement the 'ResultTypeQueryable' interface.
>       at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
>       at 
> org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
>       at 
> com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
>       at 
> com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input 
> mismatch: Unknown Error. Type is null.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
>       at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
>       at 
> com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23)
>       ... 1 more
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: 
> Unknown Error. Type is null.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
>       ... 6 more
> {noformat}
> The code compiles fine, and typechecks. Maybe something is wrong with the 
> code; but either way, Flink should report a better error message.
> A separate issue here is that the error message is being reported for the 
> wrong function: the problem is not with the return type of 
> {{join(TestMain.java:23)}}, it is some internal type (probably for a lambda 
> or something) within the function. (It is the {{where}} clause that throws 
> the exception.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to