Hi Igal, Apologies you are correct. I had my wires crossed. I had been trying to get everything working through my local ide before I deployed to our ververica cluster. I was only able to get the code running through IntelliJ by following the steps below. Once I reverted the hack and changed the config on our cluster, everything worked perfectly. Sorry for the confusion and thanks for all your help. Barry
On 2021/09/23 10:57:36, Igal Shilman <i...@apache.org> wrote: > Hi Barry! > Glad to hear that it works for you! > > I just didn't understand: > a) what is "flink.yaml" perhaps you are referring to "flink-conf.yaml"? > b) why is it bundled with the distribution jar? I couldn't find it there > (nor it should be there) > I've looked manually by: > jar tf statefun-flink-distribution-3.1.0.jar | grep "\.yaml" couldn't find > it there. > > Generally flink-conf.yaml should be part of your Flink runtime. For example > a file at /opt/flink/conf/flink-conf.yaml > > Thanks, > Igal. > > > > On Thu, Sep 23, 2021 at 11:22 AM Barry Higgins <barry.p.higgi...@gmail.com> > wrote: > > > Hi Igal, > > I just wanted to give you an update on my deployment of stateful functions > > to an existing Flink cluster. > > The good news is that it works now when I submit my config with the > > statefun-flink-distribution. Thank you very much for your help. > > There was one gotcha and that was down to the requirement to update the > > flink.yaml to include: > > classloader.parent-first-patterns.additional: > > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf > > > > As the flink.yaml is bundled in the newly created fat jar, there is no > > easy way to edit that. My hacky solution was to code around that and create > > a new entrypoint class, which replicated the code in > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob without the > > validation code that was stopping my deployment. > > It may be easier if the flink.yaml in the statefun distribution dependency > > were shipped with the required value in it by default? > > Thanks a million, > > Barry > > > > On 2021/09/10 12:37:32, Igal Shilman <i...@apache.org> wrote: > > > Hello Barry, > > > > > > I assume that by "we don't need another installation of Flink to manage > > the > > > stateful functions." You mean that you already have a running Flink > > cluster > > > and you would like to submit an additional Flink Job that executes a > > > Stateful functions application? > > > > > > Then perhaps just try to submit [1] to the flink cluster. In addition you > > > would have to make the module.yaml available in the class path. > > > You can simply place it under your flink distribution /lib directory, or > > > alternatively: > > > 1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a > > > dependency, > > > 2. Add the module.yaml definition under src/resources. > > > 3. Make sure that the main class will > > > be org.apache.flink.statefun.flink.core.StatefulFunctionsJob > > > You should be able to submit it either the web interface, or by running: > > > > > > ./bin/flink run -c > > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob > > > ./statefun-example.jar > > > > > > If this approach doesn't work for you, let me know and we will figure out > > > the DataStream integration approach. > > > > > > All the best, > > > Igal. > > > > > > [1] > > > > > https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0 > > > > > > > > > > > > On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <barry.p.higgi...@gmail.com > > > > > > wrote: > > > > > > > Hi, > > > > I'm looing at using the DataStream API from a Flink application > > against a > > > > remote python stateful function deployed on another machine. I would > > like > > > > to investigate how feasible it is to have all of the state management > > being > > > > handled from the calling side meaning that we don't need another > > > > installation of Flink to manage the stateful functions. > > > > > > > > Unfortunately the example referenced in the documentation: > > > > > > https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/ > > > > > > > > is no longer in existence: > > > > > > > > > > https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java > > > > > > > > There is an older version that is available here: > > > > > > > > > > https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example > > > > > > > > and I have tried to work with this without much success > > > > > > > > The calling element of the code looks as follows: > > > > > > > > StatefulFunctionEgressStreams out = > > > > StatefulFunctionDataStreamBuilder.builder("example") > > > > .withDataStreamAsIngress(names) > > > > .withFunctionProvider(GREET, unused -> new MyFunction()) > > > > .withRequestReplyRemoteFunction( > > > > requestReplyFunctionBuilder( > > > > REMOTE_GREET, URI.create(" > > > > http://localhost:5000/statefun")) > > > > .withPersistedState("seen_count") > > > > .withMaxRequestDuration(Duration.ofSeconds(15)) > > > > .withMaxNumBatchRequests(500)) > > > > .withEgressId(GREETINGS) > > > > .withConfiguration(statefunConfig) > > > > .build(env); > > > > > > > > with a reference to a FunctionProvider that exists as an inner class in > > > > the same class. We would like this to be a remote call, where I guess I > > > > would replace http://localhost:5000/statefun with the remote address > > of > > > > the SF. > > > > However when I do make such a change the code is still referring to the > > > > inner function and any changes to the local MyFunction class are > > returned > > > > regardless of what is deployed remotely. > > > > > > > > If anyone has a working example of how to interact via DataStreams > > with a > > > > remotely deployed SF, I would be very grateful. I would be very happy > > to > > > > update the documentation if I can get this working. > > > > Cheers, > > > > Barry > > > > > > > > > > > > > >