[ https://issues.apache.org/jira/browse/FLINK-37443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933781#comment-17933781 ]
Nil Madhab edited comment on FLINK-37443 at 3/10/25 9:09 AM: ------------------------------------------------------------- There is definitely some issue with lamdba expression, I was following FLIP-500: Support Join Extension in DataStream V2 API and tried to create a project with 2.1-snapshot, it works fine with anonomous class, but does not work with lambda. Here is the [file.|https://pastes.dev/WXmhIqfh66#L193] If I use {code:java} new JoinFunction<Order, ExpressEvent, EnrichedOrder>() {code} as lambda, it breaks with no clear error message. But otherwise works fine {code:java} Caused by: java.lang.ClassCastException: class com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder cannot be cast to class java.lang.Void (com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder is in unnamed module of loader 'app'; java.lang.Void is in module java.base of loader 'bootstrap') at org.apache.flink.api.common.typeutils.base.VoidSerializer.copy(VoidSerializer.java:30) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.datastream.impl.common.OutputCollector.collect(OutputCollector.java:35) at com.java.flinktutorials.datastreamv2.EnrichOrderExample.lambda$main$5b039ba0$1(EnrichOrderExample.java:203) at org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessOperator.processElement1(TwoInputNonBroadcastJoinProcessOperator.java:90) at org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$0(AsyncStateProcessing.java:68) at org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.lambda$syncPointRequestWithCallback$6(AsyncExecutionController.java:472) at org.apache.flink.util.function.ThrowingConsumer.lambda$unchecked$0(ThrowingConsumer.java:54) at org.apache.flink.core.state.StateFutureImpl.thenAccept(StateFutureImpl.java:112) at org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.syncPointRequestWithCallback(AsyncExecutionController.java:472) at org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator.preserveRecordOrderAndProcess(AbstractAsyncStateStreamOperator.java:198) at org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$1(AsyncStateProcessing.java:68) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:257) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:206) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:637) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:972) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:916) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:942) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) {code} was (Author: nilmadhab): There is definitely some issue with lamdba expression, I was following FLIP-500: Support Join Extension in DataStream V2 API and tried to create a project with 2.1-snapshot, it works fine with anonomous class, but does not work with lambda. Here is the [file.|https://pastes.dev/WXmhIqfh66#L193] If I use {code:java} new JoinFunction<Order, ExpressEvent, EnrichedOrder>() {code} as lambda, it breaks with no clear error message. {code:java} Caused by: java.lang.ClassCastException: class com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder cannot be cast to class java.lang.Void (com.java.flinktutorials.datastreamv2.EnrichOrderExample$EnrichedOrder is in unnamed module of loader 'app'; java.lang.Void is in module java.base of loader 'bootstrap') at org.apache.flink.api.common.typeutils.base.VoidSerializer.copy(VoidSerializer.java:30) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.datastream.impl.common.OutputCollector.collect(OutputCollector.java:35) at com.java.flinktutorials.datastreamv2.EnrichOrderExample.lambda$main$5b039ba0$1(EnrichOrderExample.java:203) at org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessOperator.processElement1(TwoInputNonBroadcastJoinProcessOperator.java:90) at org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$0(AsyncStateProcessing.java:68) at org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.lambda$syncPointRequestWithCallback$6(AsyncExecutionController.java:472) at org.apache.flink.util.function.ThrowingConsumer.lambda$unchecked$0(ThrowingConsumer.java:54) at org.apache.flink.core.state.StateFutureImpl.thenAccept(StateFutureImpl.java:112) at org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.syncPointRequestWithCallback(AsyncExecutionController.java:472) at org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator.preserveRecordOrderAndProcess(AbstractAsyncStateStreamOperator.java:198) at org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing.lambda$makeRecordProcessor$1(AsyncStateProcessing.java:68) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:257) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:206) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:637) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:972) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:916) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:942) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) {code} > Add returns() method to DataStream V2 API for specifying output types with > lambda expressions > ---------------------------------------------------------------------------------------------- > > Key: FLINK-37443 > URL: https://issues.apache.org/jira/browse/FLINK-37443 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.20.1 > Environment: * Apache Flink version: 1.20.1 > * Java version: OpenJDK 21 > * API: DataStream V2 API > Reporter: Nil Madhab > Priority: Major > > While following the official > [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/] > of DataStream V2 API, I encountered an issue. > When using the DataStream V2 API with lambda expressions for > {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink > from automatically determining the output type, resulting in the following > exception: > > {code:java} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The return type > of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be > determined automatically, due to type erasure.{code} > {code:java} > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The > generic type parameters of 'Collector' are missing. In many cases lambda > methods don't provide enough information for automatic type extraction when > Java generics are involved. An easy workaround is to use an (anonymous) class > instead that implements the > 'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction' > interface. Otherwise the type has to be specified explicitly using type > information. > at > org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code} > > > The error suggests implementing {{ResultTypeQueryable}} interface or using an > anonymous class as workarounds. In the traditional DataStream API, users > could simply call {{.returns(TypeInformation)}} to explicitly specify the > output type. > h3. Example that fails: > {code:java} > NonKeyedPartitionStream<Integer> parsed = input.process( > (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) -> > output.collect(Integer.parseInt(record)) > ); {code} > h3. > Example that works (but is more verbose): > {code:java} > NonKeyedPartitionStream<Integer> parsed = input.process( > new OneInputStreamProcessFunction<String, Integer>() { > @Override > public void processRecord(String record, Collector<Integer> output, > PartitionedContext ctx) throws Exception { > output.collect(Integer.parseInt(record)); > } > } > ); {code} > h3. > Requested Enhancement > Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the > {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the > specification of output types when using lambda expressions with process > functions. > The documentation can use the anonymous class, until the issue is fixed, to > prevent confusion for people new to flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)