[ 
https://issues.apache.org/jira/browse/FLINK-37443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nil Madhab updated FLINK-37443:
-------------------------------
    Description: 
While following the official 
[tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/]
 of  DataStream V2 API, I encountered an issue.

When using the DataStream V2 API with lambda expressions for 
{{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink from 
automatically determining the output type, resulting in the following exception:

 
{code:java}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be determined 
automatically, due to type erasure.{code}
{code:java}
 Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The 
generic type parameters of 'Collector' are missing. In many cases lambda 
methods don't provide enough information for automatic type extraction when 
Java generics are involved. An easy workaround is to use an (anonymous) class 
instead that implements the 
'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction' 
interface. Otherwise the type has to be specified explicitly using type 
information.
    at 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code}
 

 

The error suggests implementing {{ResultTypeQueryable}} interface or using an 
anonymous class as workarounds. In the traditional DataStream API, users could 
simply call {{.returns(TypeInformation)}} to explicitly specify the output type.
h3. Example that fails:
{code:java}
NonKeyedPartitionStream<Integer> parsed = input.process(
    (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> 
        output.collect(Integer.parseInt(record))
); {code}
h3.  

Example that works (but is more verbose):
{code:java}
NonKeyedPartitionStream<Integer> parsed = input.process(
    new OneInputStreamProcessFunction<String, Integer>() {
      @Override
      public void processRecord(String record, Collector<Integer> output,
          PartitionedContext ctx) throws Exception {
        output.collect(Integer.parseInt(record));
      }
    }
); {code}
h3.  

Requested Enhancement

Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the 
{{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the 
specification of output types when using lambda expressions with process 
functions.

The documentation can use the anonymous class, until the issue is fixed, to 
prevent confusion for people new to flink. 

  was:
While following the official 
[tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/]
 of  DataStream V2 API, I encountered an issue.

When using the DataStream V2 API with lambda expressions for 
{{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink from 
automatically determining the output type, resulting in the following exception:

 
{code:java}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be determined 
automatically, due to type erasure.{code}
The error suggests implementing {{ResultTypeQueryable}} interface or using an 
anonymous class as workarounds. In the traditional DataStream API, users could 
simply call {{.returns(TypeInformation)}} to explicitly specify the output type.
h3. Example that fails:
{code:java}
NonKeyedPartitionStream<Integer> parsed = input.process(
    (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> 
        output.collect(Integer.parseInt(record))
); {code}
h3. 
Example that works (but is more verbose):
{code:java}
NonKeyedPartitionStream<Integer> parsed = input.process(
    new OneInputStreamProcessFunction<String, Integer>() {
      @Override
      public void processRecord(String record, Collector<Integer> output,
          PartitionedContext ctx) throws Exception {
        output.collect(Integer.parseInt(record));
      }
    }
); {code}
h3. 

Requested Enhancement

Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the 
{{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the 
specification of output types when using lambda expressions with process 
functions.

The documentation can use the anonymous class, until the issue is fixed, to 
prevent confusion for people new to flink. 


>  Add returns() method to DataStream V2 API for specifying output types with 
> lambda expressions
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37443
>                 URL: https://issues.apache.org/jira/browse/FLINK-37443
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.20.1
>         Environment: * Apache Flink version: 1.20.1
>  * Java version: OpenJDK 21
>  * API: DataStream V2 API
>            Reporter: Nil Madhab
>            Priority: Major
>
> While following the official 
> [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/]
>  of  DataStream V2 API, I encountered an issue.
> When using the DataStream V2 API with lambda expressions for 
> {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink 
> from automatically determining the output type, resulting in the following 
> exception:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: The return type 
> of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be 
> determined automatically, due to type erasure.{code}
> {code:java}
>  Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The 
> generic type parameters of 'Collector' are missing. In many cases lambda 
> methods don't provide enough information for automatic type extraction when 
> Java generics are involved. An easy workaround is to use an (anonymous) class 
> instead that implements the 
> 'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction' 
> interface. Otherwise the type has to be specified explicitly using type 
> information.
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code}
>  
>  
> The error suggests implementing {{ResultTypeQueryable}} interface or using an 
> anonymous class as workarounds. In the traditional DataStream API, users 
> could simply call {{.returns(TypeInformation)}} to explicitly specify the 
> output type.
> h3. Example that fails:
> {code:java}
> NonKeyedPartitionStream<Integer> parsed = input.process(
>     (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> 
>         output.collect(Integer.parseInt(record))
> ); {code}
> h3.  
> Example that works (but is more verbose):
> {code:java}
> NonKeyedPartitionStream<Integer> parsed = input.process(
>     new OneInputStreamProcessFunction<String, Integer>() {
>       @Override
>       public void processRecord(String record, Collector<Integer> output,
>           PartitionedContext ctx) throws Exception {
>         output.collect(Integer.parseInt(record));
>       }
>     }
> ); {code}
> h3.  
> Requested Enhancement
> Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the 
> {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the 
> specification of output types when using lambda expressions with process 
> functions.
> The documentation can use the anonymous class, until the issue is fixed, to 
> prevent confusion for people new to flink. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to