[ https://issues.apache.org/jira/browse/FLINK-37443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17934276#comment-17934276 ]
Nil Madhab commented on FLINK-37443: ------------------------------------ [~guoweijie] can you please have a look at my PR. Please skip the test files, I have tested in my springboot app using the jar, it is working !image-2025-03-11-17-50-15-242.png! > Add returns() method to DataStream V2 API for specifying output types with > lambda expressions > ---------------------------------------------------------------------------------------------- > > Key: FLINK-37443 > URL: https://issues.apache.org/jira/browse/FLINK-37443 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.20.1 > Environment: * Apache Flink version: 1.20.1 > * Java version: OpenJDK 21 > * API: DataStream V2 API > Reporter: Nil Madhab > Assignee: Nil Madhab > Priority: Major > Labels: pull-request-available > Attachments: image-2025-03-11-17-50-15-242.png > > > While following the official > [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/] > of DataStream V2 API, I encountered an issue. > When using the DataStream V2 API with lambda expressions for > {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink > from automatically determining the output type, resulting in the following > exception: > > {code:java} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The return type > of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be > determined automatically, due to type erasure.{code} > {code:java} > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The > generic type parameters of 'Collector' are missing. In many cases lambda > methods don't provide enough information for automatic type extraction when > Java generics are involved. An easy workaround is to use an (anonymous) class > instead that implements the > 'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction' > interface. Otherwise the type has to be specified explicitly using type > information. > at > org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code} > > > The error suggests implementing {{ResultTypeQueryable}} interface or using an > anonymous class as workarounds. In the traditional DataStream API, users > could simply call {{.returns(TypeInformation)}} to explicitly specify the > output type. > h3. Example that fails: > {code:java} > NonKeyedPartitionStream<Integer> parsed = input.process( > (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> > output.collect(Integer.parseInt(record)) > ); {code} > h3. > Example that works (but is more verbose): > {code:java} > NonKeyedPartitionStream<Integer> parsed = input.process( > new OneInputStreamProcessFunction<String, Integer>() { > @Override > public void processRecord(String record, Collector<Integer> output, > PartitionedContext ctx) throws Exception { > output.collect(Integer.parseInt(record)); > } > } > ); {code} > h3. > Requested Enhancement > Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the > {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the > specification of output types when using lambda expressions with process > functions. > The documentation can use the anonymous class, until the issue is fixed, to > prevent confusion for people new to flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)