Hi Le, You can attach many different functions in a single StateFun builder, and let them message each other. In your example, you can make the "Greet" function message Greet2 directly (in addition to emitting a message as an egress). Embedding multiple copies of StateFun within a Datastream application is currently not supported.
Thanks, Igal. On Sat, Dec 26, 2020 at 6:22 AM Le Xu <[email protected]> wrote: > Hello! > > I'm trying to modify the DataStream API example > <https://github.com/apache/flink-statefun/blob/4fe04ea351145f989e144f160425424987050f68/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java> > provided by Flink Stateful Function by attaching another function and > creating a function chain. However, I'm getting the following error > > > Exception in thread "main" java.lang.IllegalArgumentException: Hash > collision on user-specified ID "feedback_union_uid1". Most likely cause is > a non-unique ID. Please check that all IDs specified via uid(String) are > unique. at > org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:178) > at > org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:109) > at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:165) > at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113) > at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850) > at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) > at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) > at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55) > at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:98) > at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:79) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682) > at > org.apache.flink.statefun.examples.datastream.Example.main(Example.java:161) > > > Here > <https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a> is > my modified example: > > https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a > > > I realize that I can only modify uid of the stage through DataStream API > but not StateFun API -- what is the best practice to avoid such error (or > there is a better way to chain stateful function in Flink)? > > > Thanks! > > > Le > > > > >
