Hi all,

I would like to have the IP of the JobManager, not the Task Executors.
I explain why.

I have an operator (my own operator that extends
AbstractUdfStreamOperator) that sends and receives messages from a
global controller. So, regardless of which TaskManager these operator
instances are deployed, they need to send and receive messages from my
controller. Currently, I am doing this using MQTT broker (this is my
first approach and I don't know if there is a better way to do it,
maybe there is...)

The first thing that I do is to start my controller using the
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
it to the JobManager host. I am getting the IP of the JobManager by
adding this method on the
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
class:
       public String getRpcServiceAddress() {
return this.rpcService.getAddress();
}
That is working. Although I am not sure if it is the best approach.

The second thing that I am doing is to make each operator instance
publish and subscribe to this controller. To do this they need the
JobManager IP. I could get the TaskManager IPs from the
AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
the JobManager IP as a parameter to the operator at the moment. I
suppose that it is easy to get the JobManager IP inside the
AbstractUdfStreamOperator or simply add some method somewhere to get
this value. However, I don't know where.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, May 21, 2020 at 7:13 AM Yangze Guo <karma...@gmail.com> wrote:
>
> Hi, Felipe
>
> Do you mean to get the Host and Port of the task executor where your
> operator is indeed running on?
>
> If that is the case, IIUC, two possible components that contain this
> information are RuntimeContext and the Configuration param of
> RichFunction#open. After reading the relevant code path, it seems you
> could not get it at the moment.
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> <alexan...@ververica.com> wrote:
> >
> > Hi Felippe,
> >
> > could you clarify in some more details what you are trying to achieve?
> >
> > Best regards,
> >
> > --
> >
> > Alexander Fedulov | Solutions Architect
> >
> > +49 1514 6265796
> >
> >
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward - The Apache Flink Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> >
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> > (Tony) Cheng
> >
> >
> >
> >
> > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez 
> > <felipe.o.gutier...@gmail.com> wrote:
> >>
> >> Hi all,
> >>
> >> I have my own operator that extends the AbstractUdfStreamOperator
> >> class and I want to issue some messages to it. Sometimes the operator
> >> instances are deployed on different TaskManagers and I would like to
> >> set some attributes like the master and slave IPs on it.
> >>
> >> I am trying to use these values but they only return localhost, not
> >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> >> 192.168.56.1).
> >>
> >> ConfigOption<String> restAddressOption = ConfigOptions
> >>    .key("rest.address")
> >>    .stringType()
> >>    .noDefaultValue();
> >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> >> System.out.println("rpcService: " + rpcService.getAddress());
> >>
> >>
> >> Thanks,
> >> Felipe
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >> -- https://felipeogutierrez.blogspot.com

Reply via email to