Hello!

I seem to wire the wrong function by attaching the first function's output
to the remote reply function of the second function. The process works
great now. Thanks!

Le

On Sat, Dec 26, 2020 at 11:23 PM Le Xu <sharonx...@gmail.com> wrote:

> I apologize -- I meant to point out line 152, where context.send was used.
>
> On Sat, Dec 26, 2020 at 11:21 PM Le Xu <sharonx...@gmail.com> wrote:
>
>> Hi Igal:
>>
>> Thanks for the suggestion -- I changed the implementation based on your
>> suggestion by attaching the second function right after the first one using
>> the same builder. The only difference is that except the first function
>> send to egress -- it now sends to the the second function and then the
>> second function sends to egress. However I'm getting a new error saying
>> that "java.lang.String cannot be cast to com.google.protobuf.Any". The full
>> trace is at the end. The strange things is that it tries to cast java
>> String to protobuf.Any. But in no where in the program that I used
>> protobuf  (I believe Kryo is the default option). Therefore I'm doubt that
>> I did not use context.send in the right way. I'm attaching the my code
>> here
>> <https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a#file-example-java-L169>
>> (https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a ,
>> line 169).
>>
>> Thanks!
>>
>> Le
>>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> at
>> org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
>> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>> at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
>> by NoRestartBackoffTimeStrategy
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ... 4 more
>> Caused by:
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
>> An error occurred when attempting to invoke function FunctionType(example,
>> remote-greet2).
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
>> at
>> org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73)
>> at
>> org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61)
>> at
>> org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161)
>> at
>> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146)
>> at
>> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186)
>> at
>> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
>> to com.google.protobuf.Any
>> at
>> org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>> ... 24 more
>>
>> On Sat, Dec 26, 2020 at 2:22 PM Igal Shilman <i...@ververica.com> wrote:
>>
>>> Hi Le,
>>>
>>> You can attach many different functions in a single StateFun builder,
>>> and let them message each other.
>>> In your example, you can make the "Greet" function message Greet2
>>> directly (in addition to emitting a message as an egress).
>>> Embedding multiple copies of StateFun within a Datastream application is
>>> currently not supported.
>>>
>>> Thanks,
>>> Igal.
>>>
>>>
>>> On Sat, Dec 26, 2020 at 6:22 AM Le Xu <sharonx...@gmail.com> wrote:
>>>
>>>> Hello!
>>>>
>>>> I'm trying to modify the DataStream API example
>>>> <https://github.com/apache/flink-statefun/blob/4fe04ea351145f989e144f160425424987050f68/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java>
>>>> provided by Flink Stateful Function by attaching another function and
>>>> creating a function chain. However, I'm getting the following error
>>>>
>>>>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Hash
>>>> collision on user-specified ID "feedback_union_uid1". Most likely cause is
>>>> a non-unique ID. Please check that all IDs specified via uid(String)
>>>> are unique. at
>>>> org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:178)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:109)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:165)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
>>>> at
>>>> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>>>> at
>>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>>>> at
>>>> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
>>>> at
>>>> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:98)
>>>> at
>>>> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:79)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
>>>> at
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
>>>> at
>>>> org.apache.flink.statefun.examples.datastream.Example.main(Example.java:161)
>>>>
>>>>
>>>> Here
>>>> <https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a>
>>>> is my modified example:
>>>>
>>>> https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a
>>>>
>>>>
>>>> I realize that I can only modify uid of the stage through DataStream
>>>> API but not StateFun API -- what is the best practice to avoid such error
>>>> (or there is a better way to chain stateful function in Flink)?
>>>>
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> Le
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to