Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2337#discussion_r77984002 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -792,12 +832,40 @@ else if (t instanceof Class) { return null; } - + + @SuppressWarnings({"unchecked", "rawtypes"}) private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) { TypeInformation<?> info = null; - + + // use a factory to find corresponding type information to type variable + final ArrayList<Type> factoryHierarchy = new ArrayList<>(inputTypeHierarchy); + final TypeInfoFactory<?> factory = getClosestFactory(factoryHierarchy, inType); + if (factory != null) { + // the type that defines the factory is last in factory hierarchy + final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1); + // defining type has generics, the factory need to be asked for a mapping of subtypes to type information + if (factoryDefiningType instanceof ParameterizedType) { --- End diff -- Yes, because we try to create type information from the input in this method. This only works if a type variable connects input with output. `Map<InType<A>, OutType<A>>`
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---