ok, I see. Do you suggest a better approach to send messages from the JobManager to the TaskManagers and my specific operator?
Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, May 25, 2020 at 4:23 AM Yangze Guo <karma...@gmail.com> wrote: > > Glad to see that! > > However, I was told that it is not the right approach to directly > extend `AbstractUdfStreamOperator` in DataStream API. This would be > fixed at some point (maybe Flink 2.0). The JIRA link is [1]. > > [1] https://issues.apache.org/jira/browse/FLINK-17862 > > Best, > Yangze Guo > > On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: > > > > thanks. it worked! > > > > I add the following method at the > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext > > class: > > > > public Environment getTaskEnvironment() { return this.taskEnvironment; } > > > > Then I am getting the IP using: > > > > ConfigOption<String> restAddressOption = ConfigOptions > > .key("rest.address") > > .stringType() > > .noDefaultValue(); > > String restAddress = > > this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption); > > > > Thanks! > > > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com > > > > On Fri, May 22, 2020 at 3:54 AM Yangze Guo <karma...@gmail.com> wrote: > > > > > > Hi, Felipe > > > > > > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you > > > could indeed get all the configurations(including what you defined in > > > flink-conf.yaml) through > > > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()". > > > However, I guess it is not the right behavior and might be fixed in > > > future versions. > > > > > > Best, > > > Yangze Guo > > > > > > > > > > > > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez > > > <felipe.o.gutier...@gmail.com> wrote: > > > > > > > > Hi all, > > > > > > > > I would like to have the IP of the JobManager, not the Task Executors. > > > > I explain why. > > > > > > > > I have an operator (my own operator that extends > > > > AbstractUdfStreamOperator) that sends and receives messages from a > > > > global controller. So, regardless of which TaskManager these operator > > > > instances are deployed, they need to send and receive messages from my > > > > controller. Currently, I am doing this using MQTT broker (this is my > > > > first approach and I don't know if there is a better way to do it, > > > > maybe there is...) > > > > > > > > The first thing that I do is to start my controller using the > > > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe > > > > it to the JobManager host. I am getting the IP of the JobManager by > > > > adding this method on the > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory > > > > class: > > > > public String getRpcServiceAddress() { > > > > return this.rpcService.getAddress(); > > > > } > > > > That is working. Although I am not sure if it is the best approach. > > > > > > > > The second thing that I am doing is to make each operator instance > > > > publish and subscribe to this controller. To do this they need the > > > > JobManager IP. I could get the TaskManager IPs from the > > > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing > > > > the JobManager IP as a parameter to the operator at the moment. I > > > > suppose that it is easy to get the JobManager IP inside the > > > > AbstractUdfStreamOperator or simply add some method somewhere to get > > > > this value. However, I don't know where. > > > > > > > > Thanks, > > > > Felipe > > > > > > > > -- > > > > -- Felipe Gutierrez > > > > -- skype: felipe.o.gutierrez > > > > -- https://felipeogutierrez.blogspot.com > > > > > > > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <karma...@gmail.com> wrote: > > > > > > > > > > Hi, Felipe > > > > > > > > > > Do you mean to get the Host and Port of the task executor where your > > > > > operator is indeed running on? > > > > > > > > > > If that is the case, IIUC, two possible components that contain this > > > > > information are RuntimeContext and the Configuration param of > > > > > RichFunction#open. After reading the relevant code path, it seems you > > > > > could not get it at the moment. > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > > > > > <alexan...@ververica.com> wrote: > > > > > > > > > > > > Hi Felippe, > > > > > > > > > > > > could you clarify in some more details what you are trying to > > > > > > achieve? > > > > > > > > > > > > Best regards, > > > > > > > > > > > > -- > > > > > > > > > > > > Alexander Fedulov | Solutions Architect > > > > > > > > > > > > +49 1514 6265796 > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData > > > > > > > > > > > > -- > > > > > > > > > > > > Join Flink Forward - The Apache Flink Conference > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > -- > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > -- > > > > > > > > > > > > Ververica GmbH > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung > > > > > > Jason, Ji (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez > > > > > > <felipe.o.gutier...@gmail.com> wrote: > > > > > >> > > > > > >> Hi all, > > > > > >> > > > > > >> I have my own operator that extends the AbstractUdfStreamOperator > > > > > >> class and I want to issue some messages to it. Sometimes the > > > > > >> operator > > > > > >> instances are deployed on different TaskManagers and I would like > > > > > >> to > > > > > >> set some attributes like the master and slave IPs on it. > > > > > >> > > > > > >> I am trying to use these values but they only return localhost, not > > > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > > > > > >> 192.168.56.1). > > > > > >> > > > > > >> ConfigOption<String> restAddressOption = ConfigOptions > > > > > >> .key("rest.address") > > > > > >> .stringType() > > > > > >> .noDefaultValue(); > > > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " > > > > > >> + > > > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > > > > > >> System.out.println("rpcService: " + rpcService.getAddress()); > > > > > >> > > > > > >> > > > > > >> Thanks, > > > > > >> Felipe > > > > > >> > > > > > >> -- > > > > > >> -- Felipe Gutierrez > > > > > >> -- skype: felipe.o.gutierrez > > > > > >> -- https://felipeogutierrez.blogspot.com