[ https://issues.apache.org/jira/browse/FLINK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546532#comment-16546532 ]
ASF GitHub Bot commented on FLINK-7251: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6120#discussion_r203003680 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -237,19 +248,81 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) { } /** - * Gets the type of the key by which the stream is partitioned. - * @return The type of the key by which the stream is partitioned. + * Tries to fill in the type information. Type information can be filled in + * later when the program uses a type hint. This method checks whether the + * type information has ever been accessed before and does not allow + * modifications if the type was accessed already. This ensures consistency + * by making sure different parts of the operation do not assume different + * type information. + * + * @param keyType The type information to fill in. + * + * @throws IllegalStateException Thrown, if the type information has been accessed before. + */ + private void setKeyType(TypeInformation<KEY> keyType) { + if (typeUsed) { + throw new IllegalStateException( + "TypeInformation cannot be filled in for the type after it has been used. " + + "Please make sure that the type info hints are the first call after " + + "the keyBy() function before any other access."); + } + this.keyType = keyType; + } + + /** + * Returns the key type of this {@code KeyedStream} as a {@link TypeInformation}. Once + * this is used once the key type cannot be changed anymore using {@link #returns(TypeInformation)}. + * + * @return The output type of this {@code KeyedStream} */ @Internal public TypeInformation<KEY> getKeyType() { - return keyType; + if (keyType instanceof MissingTypeInfo) { + MissingTypeInfo typeInfo = (MissingTypeInfo) this.keyType; + throw new InvalidTypesException( + "The key type of key selector '" + + typeInfo.getFunctionName() + + "' could not be determined automatically, due to type erasure. " + + "You can give type information hints by using the returns(...) " + + "method on the result of the transformation call, or by letting " + + "your selector implement the 'ResultTypeQueryable' " + + "interface.", typeInfo.getTypeException()); + } + + // perform the validation when the type is used for the first time + if (!typeUsed) { + typeUsed = true; + validateKeyType(keyType); + } + + return this.keyType; } @Override protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) { throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream."); } + // ------------------------------------------------------------------------ + // Type hinting + // ------------------------------------------------------------------------ + + /** + * Adds a type information hint about the key type of a key selector. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a key selector is. That can be the case if the selector uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * @param typeInfo type information as a key type hint + * @return This operator with a given key type hint. + */ + public KeyedStream<T, KEY> returns(TypeInformation<KEY> typeInfo) { --- End diff -- Should probably make this `@PublicEvolving` for now. > Merge the flink-java8 project into flink-core > --------------------------------------------- > > Key: FLINK-7251 > URL: https://issues.apache.org/jira/browse/FLINK-7251 > Project: Flink > Issue Type: Improvement > Components: Build System > Reporter: Stephan Ewen > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)