Hi all, I would like to have the IP of the JobManager, not the Task Executors. I explain why.
I have an operator (my own operator that extends AbstractUdfStreamOperator) that sends and receives messages from a global controller. So, regardless of which TaskManager these operator instances are deployed, they need to send and receive messages from my controller. Currently, I am doing this using MQTT broker (this is my first approach and I don't know if there is a better way to do it, maybe there is...) The first thing that I do is to start my controller using the org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe it to the JobManager host. I am getting the IP of the JobManager by adding this method on the org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory class: public String getRpcServiceAddress() { return this.rpcService.getAddress(); } That is working. Although I am not sure if it is the best approach. The second thing that I am doing is to make each operator instance publish and subscribe to this controller. To do this they need the JobManager IP. I could get the TaskManager IPs from the AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing the JobManager IP as a parameter to the operator at the moment. I suppose that it is easy to get the JobManager IP inside the AbstractUdfStreamOperator or simply add some method somewhere to get this value. However, I don't know where. Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Thu, May 21, 2020 at 7:13 AM Yangze Guo <karma...@gmail.com> wrote: > > Hi, Felipe > > Do you mean to get the Host and Port of the task executor where your > operator is indeed running on? > > If that is the case, IIUC, two possible components that contain this > information are RuntimeContext and the Configuration param of > RichFunction#open. After reading the relevant code path, it seems you > could not get it at the moment. > > Best, > Yangze Guo > > Best, > Yangze Guo > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > <alexan...@ververica.com> wrote: > > > > Hi Felippe, > > > > could you clarify in some more details what you are trying to achieve? > > > > Best regards, > > > > -- > > > > Alexander Fedulov | Solutions Architect > > > > +49 1514 6265796 > > > > > > > > Follow us @VervericaData > > > > -- > > > > Join Flink Forward - The Apache Flink Conference > > > > Stream Processing | Event Driven | Real Time > > > > -- > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > -- > > > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > > (Tony) Cheng > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez > > <felipe.o.gutier...@gmail.com> wrote: > >> > >> Hi all, > >> > >> I have my own operator that extends the AbstractUdfStreamOperator > >> class and I want to issue some messages to it. Sometimes the operator > >> instances are deployed on different TaskManagers and I would like to > >> set some attributes like the master and slave IPs on it. > >> > >> I am trying to use these values but they only return localhost, not > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > >> 192.168.56.1). > >> > >> ConfigOption<String> restAddressOption = ConfigOptions > >> .key("rest.address") > >> .stringType() > >> .noDefaultValue(); > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > >> System.out.println("rpcService: " + rpcService.getAddress()); > >> > >> > >> Thanks, > >> Felipe > >> > >> -- > >> -- Felipe Gutierrez > >> -- skype: felipe.o.gutierrez > >> -- https://felipeogutierrez.blogspot.com