[ 
https://issues.apache.org/jira/browse/FLINK-37443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933781#comment-17933781
 ] 

Nil Madhab edited comment on FLINK-37443 at 3/10/25 9:09 AM:
-------------------------------------------------------------

There is definitely some issue with lamdba expression, I was following 
FLIP-500: Support Join Extension in DataStream V2 API and tried to create a 
project with 2.1-snapshot, it works fine with anonomous class, but does not 
work with lambda. 

Here is the [file.|https://pastes.dev/WXmhIqfh66#L193]

If I use 
{code:java}
new JoinFunction<Order, ExpressEvent, EnrichedOrder>() {code}
as lambda, it breaks with no clear error message. But otherwise works fine
{code:java}
Caused by: java.lang.ClassCastException: class 
com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder cannot be 
cast to class java.lang.Void 
(com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder is in 
unnamed module of loader 'app'; java.lang.Void is in module java.base of loader 
'bootstrap')
    at 
org.apache.flink.api.common.typeutils.base.VoidSerializer.copy(VoidSerializer.java:30)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.datastream.impl.common.OutputCollector.collect(OutputCollector.java:35)
    at 
com.java.flinktutorials.datastreamv2.EnrichOrderExample.lambda$main$5b039ba0$1(EnrichOrderExample.java:203)
    at 
org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessOperator.processElement1(TwoInputNonBroadcastJoinProcessOperator.java:90)
    at 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$0(AsyncStateProcessing.java:68)
    at 
org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.lambda$syncPointRequestWithCallback$6(AsyncExecutionController.java:472)
    at 
org.apache.flink.util.function.ThrowingConsumer.lambda$unchecked$0(ThrowingConsumer.java:54)
    at 
org.apache.flink.core.state.StateFutureImpl.thenAccept(StateFutureImpl.java:112)
    at 
org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.syncPointRequestWithCallback(AsyncExecutionController.java:472)
    at 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator.preserveRecordOrderAndProcess(AbstractAsyncStateStreamOperator.java:198)
    at 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$1(AsyncStateProcessing.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:257)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:206)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:163)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:637)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:972)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:916)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:942)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) {code}


was (Author: nilmadhab):
There is definitely some issue with lamdba expression, I was following 
FLIP-500: Support Join Extension in DataStream V2 API and tried to create a 
project with 2.1-snapshot, it works fine with anonomous class, but does not 
work with lambda. 

Here is the [file.|https://pastes.dev/WXmhIqfh66#L193]

If I use 
{code:java}
new JoinFunction<Order, ExpressEvent, EnrichedOrder>() {code}
as lambda, it breaks with no clear error message.
{code:java}
Caused by: java.lang.ClassCastException: class 
com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder cannot be 
cast to class java.lang.Void 
(com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder is in 
unnamed module of loader 'app'; java.lang.Void is in module java.base of loader 
'bootstrap')
    at 
org.apache.flink.api.common.typeutils.base.VoidSerializer.copy(VoidSerializer.java:30)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.datastream.impl.common.OutputCollector.collect(OutputCollector.java:35)
    at 
com.java.flinktutorials.datastreamv2.EnrichOrderExample.lambda$main$5b039ba0$1(EnrichOrderExample.java:203)
    at 
org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessOperator.processElement1(TwoInputNonBroadcastJoinProcessOperator.java:90)
    at 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$0(AsyncStateProcessing.java:68)
    at 
org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.lambda$syncPointRequestWithCallback$6(AsyncExecutionController.java:472)
    at 
org.apache.flink.util.function.ThrowingConsumer.lambda$unchecked$0(ThrowingConsumer.java:54)
    at 
org.apache.flink.core.state.StateFutureImpl.thenAccept(StateFutureImpl.java:112)
    at 
org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.syncPointRequestWithCallback(AsyncExecutionController.java:472)
    at 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator.preserveRecordOrderAndProcess(AbstractAsyncStateStreamOperator.java:198)
    at 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$1(AsyncStateProcessing.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:257)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:206)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:163)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:637)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:972)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:916)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:942)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) {code}

>  Add returns() method to DataStream V2 API for specifying output types with 
> lambda expressions
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37443
>                 URL: https://issues.apache.org/jira/browse/FLINK-37443
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.20.1
>         Environment: * Apache Flink version: 1.20.1
>  * Java version: OpenJDK 21
>  * API: DataStream V2 API
>            Reporter: Nil Madhab
>            Priority: Major
>
> While following the official 
> [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/]
>  of  DataStream V2 API, I encountered an issue.
> When using the DataStream V2 API with lambda expressions for 
> {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink 
> from automatically determining the output type, resulting in the following 
> exception:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: The return type 
> of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be 
> determined automatically, due to type erasure.{code}
> {code:java}
>  Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The 
> generic type parameters of 'Collector' are missing. In many cases lambda 
> methods don't provide enough information for automatic type extraction when 
> Java generics are involved. An easy workaround is to use an (anonymous) class 
> instead that implements the 
> 'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction' 
> interface. Otherwise the type has to be specified explicitly using type 
> information.
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code}
>  
>  
> The error suggests implementing {{ResultTypeQueryable}} interface or using an 
> anonymous class as workarounds. In the traditional DataStream API, users 
> could simply call {{.returns(TypeInformation)}} to explicitly specify the 
> output type.
> h3. Example that fails:
> {code:java}
> NonKeyedPartitionStream<Integer> parsed = input.process(
>     (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> 
>         output.collect(Integer.parseInt(record))
> ); {code}
> h3.  
> Example that works (but is more verbose):
> {code:java}
> NonKeyedPartitionStream<Integer> parsed = input.process(
>     new OneInputStreamProcessFunction<String, Integer>() {
>       @Override
>       public void processRecord(String record, Collector<Integer> output,
>           PartitionedContext ctx) throws Exception {
>         output.collect(Integer.parseInt(record));
>       }
>     }
> ); {code}
> h3.  
> Requested Enhancement
> Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the 
> {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the 
> specification of output types when using lambda expressions with process 
> functions.
> The documentation can use the anonymous class, until the issue is fixed, to 
> prevent confusion for people new to flink. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to