[ https://issues.apache.org/jira/browse/FLINK-37443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nil Madhab updated FLINK-37443: ------------------------------- Description: While following the official [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/] of DataStream V2 API, I encountered an issue. When using the DataStream V2 API with lambda expressions for {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink from automatically determining the output type, resulting in the following exception: {code:java} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be determined automatically, due to type erasure.{code} {code:java} Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction' interface. Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code} The error suggests implementing {{ResultTypeQueryable}} interface or using an anonymous class as workarounds. In the traditional DataStream API, users could simply call {{.returns(TypeInformation)}} to explicitly specify the output type. h3. Example that fails: {code:java} NonKeyedPartitionStream<Integer> parsed = input.process( (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> output.collect(Integer.parseInt(record)) ); {code} h3. Example that works (but is more verbose): {code:java} NonKeyedPartitionStream<Integer> parsed = input.process( new OneInputStreamProcessFunction<String, Integer>() { @Override public void processRecord(String record, Collector<Integer> output, PartitionedContext ctx) throws Exception { output.collect(Integer.parseInt(record)); } } ); {code} h3. Requested Enhancement Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the specification of output types when using lambda expressions with process functions. The documentation can use the anonymous class, until the issue is fixed, to prevent confusion for people new to flink. was: While following the official [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/] of DataStream V2 API, I encountered an issue. When using the DataStream V2 API with lambda expressions for {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink from automatically determining the output type, resulting in the following exception: {code:java} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be determined automatically, due to type erasure.{code} The error suggests implementing {{ResultTypeQueryable}} interface or using an anonymous class as workarounds. In the traditional DataStream API, users could simply call {{.returns(TypeInformation)}} to explicitly specify the output type. h3. Example that fails: {code:java} NonKeyedPartitionStream<Integer> parsed = input.process( (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> output.collect(Integer.parseInt(record)) ); {code} h3. Example that works (but is more verbose): {code:java} NonKeyedPartitionStream<Integer> parsed = input.process( new OneInputStreamProcessFunction<String, Integer>() { @Override public void processRecord(String record, Collector<Integer> output, PartitionedContext ctx) throws Exception { output.collect(Integer.parseInt(record)); } } ); {code} h3. Requested Enhancement Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the specification of output types when using lambda expressions with process functions. The documentation can use the anonymous class, until the issue is fixed, to prevent confusion for people new to flink. > Add returns() method to DataStream V2 API for specifying output types with > lambda expressions > ---------------------------------------------------------------------------------------------- > > Key: FLINK-37443 > URL: https://issues.apache.org/jira/browse/FLINK-37443 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.20.1 > Environment: * Apache Flink version: 1.20.1 > * Java version: OpenJDK 21 > * API: DataStream V2 API > Reporter: Nil Madhab > Priority: Major > > While following the official > [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/] > of DataStream V2 API, I encountered an issue. > When using the DataStream V2 API with lambda expressions for > {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink > from automatically determining the output type, resulting in the following > exception: > > {code:java} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The return type > of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be > determined automatically, due to type erasure.{code} > {code:java} > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The > generic type parameters of 'Collector' are missing. In many cases lambda > methods don't provide enough information for automatic type extraction when > Java generics are involved. An easy workaround is to use an (anonymous) class > instead that implements the > 'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction' > interface. Otherwise the type has to be specified explicitly using type > information. > at > org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code} > > > The error suggests implementing {{ResultTypeQueryable}} interface or using an > anonymous class as workarounds. In the traditional DataStream API, users > could simply call {{.returns(TypeInformation)}} to explicitly specify the > output type. > h3. Example that fails: > {code:java} > NonKeyedPartitionStream<Integer> parsed = input.process( > (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> > output.collect(Integer.parseInt(record)) > ); {code} > h3. > Example that works (but is more verbose): > {code:java} > NonKeyedPartitionStream<Integer> parsed = input.process( > new OneInputStreamProcessFunction<String, Integer>() { > @Override > public void processRecord(String record, Collector<Integer> output, > PartitionedContext ctx) throws Exception { > output.collect(Integer.parseInt(record)); > } > } > ); {code} > h3. > Requested Enhancement > Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the > {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the > specification of output types when using lambda expressions with process > functions. > The documentation can use the anonymous class, until the issue is fixed, to > prevent confusion for people new to flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)