[ 
https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935576#comment-15935576
 ] 

Luke Hutchison edited comment on FLINK-6114 at 3/22/17 12:25 AM:
-----------------------------------------------------------------

[~greghogan] I partially reconstructed exactly what I was doing before the 
exception was triggered that I reported originally. I was wrong about not 
joining using the generic type, I was actually using it as the join key. This 
works for smaller tests that I try, but for my more extensive example shown 
below, I cannot get it to work.

Given the following code:

{code}
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

public class MainTest {

    public static <K> DataSet<Tuple2<K, Float>> 
convertToFractionalRank(DataSet<Tuple2<K, Float>> key_score) {
        // Sum within each key
        // Result: ("", key, totScore)
        DataSet<Tuple3<String, K, Float>> blank_key_totScore = 
                key_score 
                        .groupBy(0).sum(1) 
                        // Prepend with "" to prep for for join
                        .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = 
*/ t.f1)) 
                        // .returns(TypeInformation.of(new 
TypeHint<Tuple3<String, K, Float>>(){}))  // (2)
                        ;

        // Count unique keys. Result: ("", numKeys)
        DataSet<Tuple2<String, Integer>> blank_numKeys = 
                blank_key_totScore 
                        .distinct(0)                                            
                     // (1)
                        .map(t -> new Tuple2<String, Integer>("", 1)) 
                        .groupBy(0).sum(1);

        // Sort scores into order, then return the fractional rank in the range 
[0, 1]
        return blank_key_totScore 
                .coGroup(blank_numKeys) 
                .where(0).equalTo(0) 
                .with((Iterable<Tuple3<String, K, Float>> ai, 
Iterable<Tuple2<String, Integer>> bi,
                        Collector<Tuple4<String, K, Float, Integer>> out) -> {
                    int numKeys = bi.iterator().next().f1;
                    for (Tuple3<String, K, Float> a : ai) {
                        out.collect(new Tuple4<>("", /* key = */ a.f1, /* 
totScore = */ a.f2, numKeys));
                    }
                }) 
                // Group by "" (i.e. make into one group, so all the scores can 
be sorted together)
                .groupBy(0) 
                // Sort in descending order of score (the highest score gets 
the lowest rank, and vice versa)
                .sortGroup(2, Order.DESCENDING) 
                // Convert sorted rank from [0, numKeys-1] -> [0, 1]
                .reduceGroup(
                        (Iterable<Tuple4<String, K, Float, Integer>> iter, 
Collector<Tuple2<K, Float>> out) -> {
                            int rank = 0;
                            for (Tuple4<String, K, Float, Integer> t : iter) {
                                int numKeys = t.f3; // Same for all tuples
                                float fracRank = rank / (float) (numKeys - 1);
                                out.collect(new Tuple2<>(/* key = */ t.f1, 
fracRank));
                                rank++;
                            }
                        })
                .name("convert problem severity scores into building scores");
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds = 
env.fromElements(
                new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new 
Tuple2<>("x", 2), 1.0f),
                new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new 
Tuple2<>("x", 3), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new 
Tuple2<>("y", 1), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new 
Tuple2<>("y", 3), 1.0f));
        DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 = 
convertToFractionalRank(ds);
        System.out.println(ds2.collect());
    }
}
{code}

This exception is thrown at the line marked {{// (1)}}:

{noformat}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'convertToFractionalRank(MainTest.java:21)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
        at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
        at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
        at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
        at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input 
mismatch: Unknown Error. Type is null.
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
        at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
        at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
        ... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown 
Error. Type is null.
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
        ... 6 more
{noformat}

If you uncomment line {{// (2)}}, you instead get:

{noformat}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(org.apache.flink.api.java.DataSet)'
 could not be determined. This is most likely a type erasure problem. The type 
extraction currently supports types with generic variables only in cases where 
all variables in the return type can be deduced from the input type(s).
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
        at 
org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:47)
        at 
com.rentlogic.buildingscores.flink.MainTest$1.<init>(MainTest.java:24)
        at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:24)
        at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:71)
{noformat}

I was then trying some other options to get the type K to work with the Flink 
operators. This is where I'm stuck, I can't remember exactly what I tweaked to 
trigger the exception. But the code above is a much more complete 
representation of what I was trying to do.

What's the right way to get generics like this working with Flink? Why does the 
above not work currently? (Everything I try throws some sort of exception..)


was (Author: lukehutch):
[~greghogan] I partially reconstructed exactly what I was doing before the 
exception was triggered that I reported originally. I was wrong about not 
joining using the generic type, I was actually using it as the join key. This 
works for smaller tests that I try, but for my more extensive example shown 
below, I cannot get it to work.

Given the following code:

{{code}}
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

public class MainTest {

    public static <K> DataSet<Tuple2<K, Float>> 
convertToFractionalRank(DataSet<Tuple2<K, Float>> key_score) {
        // Sum within each key
        // Result: ("", key, totScore)
        DataSet<Tuple3<String, K, Float>> blank_key_totScore = 
                key_score 
                        .groupBy(0).sum(1) 
                        // Prepend with "" to prep for for join
                        .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = 
*/ t.f1)) 
                        // .returns(TypeInformation.of(new 
TypeHint<Tuple3<String, K, Float>>(){}))  // (2)
                        ;

        // Count unique keys. Result: ("", numKeys)
        DataSet<Tuple2<String, Integer>> blank_numKeys = 
                blank_key_totScore 
                        .distinct(0)                                            
                     // (1)
                        .map(t -> new Tuple2<String, Integer>("", 1)) 
                        .groupBy(0).sum(1);

        // Sort scores into order, then return the fractional rank in the range 
[0, 1]
        return blank_key_totScore 
                .coGroup(blank_numKeys) 
                .where(0).equalTo(0) 
                .with((Iterable<Tuple3<String, K, Float>> ai, 
Iterable<Tuple2<String, Integer>> bi,
                        Collector<Tuple4<String, K, Float, Integer>> out) -> {
                    int numKeys = bi.iterator().next().f1;
                    for (Tuple3<String, K, Float> a : ai) {
                        out.collect(new Tuple4<>("", /* key = */ a.f1, /* 
totScore = */ a.f2, numKeys));
                    }
                }) 
                // Group by "" (i.e. make into one group, so all the scores can 
be sorted together)
                .groupBy(0) 
                // Sort in descending order of score (the highest score gets 
the lowest rank, and vice versa)
                .sortGroup(2, Order.DESCENDING) 
                // Convert sorted rank from [0, numKeys-1] -> [0, 1]
                .reduceGroup(
                        (Iterable<Tuple4<String, K, Float, Integer>> iter, 
Collector<Tuple2<K, Float>> out) -> {
                            int rank = 0;
                            for (Tuple4<String, K, Float, Integer> t : iter) {
                                int numKeys = t.f3; // Same for all tuples
                                float fracRank = rank / (float) (numKeys - 1);
                                out.collect(new Tuple2<>(/* key = */ t.f1, 
fracRank));
                                rank++;
                            }
                        })
                .name("convert problem severity scores into building scores");
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds = 
env.fromElements(
                new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new 
Tuple2<>("x", 2), 1.0f),
                new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new 
Tuple2<>("x", 3), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new 
Tuple2<>("y", 1), 1.0f),
                new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new 
Tuple2<>("y", 3), 1.0f));
        DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 = 
convertToFractionalRank(ds);
        System.out.println(ds2.collect());
    }
}
{{code}}

This exception is thrown at the line marked {{// (1)}}:

{{noformat}}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'convertToFractionalRank(MainTest.java:21)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
        at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
        at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
        at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
        at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input 
mismatch: Unknown Error. Type is null.
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
        at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
        at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
        ... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown 
Error. Type is null.
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
        ... 6 more
{{noformat}}

If you uncomment line {{// (2)}}, you instead get:

{{noformat}}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(org.apache.flink.api.java.DataSet)'
 could not be determined. This is most likely a type erasure problem. The type 
extraction currently supports types with generic variables only in cases where 
all variables in the return type can be deduced from the input type(s).
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
        at 
org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:47)
        at 
com.rentlogic.buildingscores.flink.MainTest$1.<init>(MainTest.java:24)
        at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:24)
        at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:71)
{{noformat}}

I was then trying some other options to get the type K to work with the Flink 
operators. This is where I'm stuck, I can't remember exactly what I tweaked to 
trigger the exception. But the code above is a much more complete 
representation of what I was trying to do.

What's the right way to get generics like this working with Flink? Why does the 
above not work currently? (Everything I try throws some sort of exception..)

> Type checking fails with generics, even when concrete type of field is not 
> needed
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-6114
>                 URL: https://issues.apache.org/jira/browse/FLINK-6114
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> The Flink type checker does not allow generic types to be used in any field 
> of a tuple when a join is being executed, even if the generic is not in a 
> field that is involved in the join.
> I have a type Tuple3<String, K, Float>, which contains a generic type 
> parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is 
> well-defined as String. However, this gives me the following error:
> {noformat}
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: Type of 
> TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet 
> mypkg.MyClass.method(params)' could not be determined. This is most likely a 
> type erasure problem. The type extraction currently supports types with 
> generic variables only in cases where all variables in the return type can be 
> deduced from the input type(s).
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
> {noformat}
> The code compiles fine, however -- the static type system is able to 
> correctly resolve the types in the surrounding code.
> Really only the fields that are affected by joins (or groupBy, aggregation 
> etc.) should be checked for concrete types in this way.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to