Nil Madhab created FLINK-37443: ---------------------------------- Summary: Add returns() method to DataStream V2 API for specifying output types with lambda expressions Key: FLINK-37443 URL: https://issues.apache.org/jira/browse/FLINK-37443 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.20.1 Environment: * Apache Flink version: 1.20.1 * Java version: OpenJDK 21 * API: DataStream V2 API Reporter: Nil Madhab
While following the official [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/] of DataStream V2 API, I encountered an issue. When using the DataStream V2 API with lambda expressions for {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink from automatically determining the output type, resulting in the following exception: {code:java} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be determined automatically, due to type erasure.{code} The error suggests implementing {{ResultTypeQueryable}} interface or using an anonymous class as workarounds. In the traditional DataStream API, users could simply call {{.returns(TypeInformation)}} to explicitly specify the output type. h3. Example that fails: {code:java} NonKeyedPartitionStream<Integer> parsed = input.process( (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> output.collect(Integer.parseInt(record)) ); {code} h3. Example that works (but is more verbose): {code:java} NonKeyedPartitionStream<Integer> parsed = input.process( new OneInputStreamProcessFunction<String, Integer>() { @Override public void processRecord(String record, Collector<Integer> output, PartitionedContext ctx) throws Exception { output.collect(Integer.parseInt(record)); } } ); {code} h3. Requested Enhancement Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the specification of output types when using lambda expressions with process functions. The documentation can use the anonymous class, until the issue is fixed, to prevent confusion for people new to flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)