Thanks for your response.

Yes service url, admin url, topic etc are correct and have correct prefix
as well.

It is working with normal java application.

I am not sure how i can pass the TLS certs.




On Tue, 16 May, 2023, 14:52 Weihua Hu, <huweihua....@gmail.com> wrote:

> Hi,
>
> Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl
>
> Best,
> Weihua
>
>
> On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bdkan...@gmail.com> wrote:
>
> > Can someone please help with this?
> >
> > On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bdkan...@gmail.com> wrote:
> >
> > > I am trying to connect my Flink application to a Pulsar topic for
> > > ingesting data. The topic is active and i am able to ingest the data
> via
> > a
> > > normal Java application.
> > >
> > > When i try to use the Flink application to ingest the data from the
> same
> > > topic, using the latest version of flink-connector-pulsar i.e
> > 4.0.0-1.17, i
> > > do not find in the documenation anywhere how to pass to pass the TLS
> > certs.
> > >
> > > I tried with below code:
> > >
> > >
> > > final StreamExecutionEnvironment envn =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > >
> > > Configuration config = new Configuration();
> > >
> > >             config.setString("pulsar.client.authentication","tls");
> > >
> >  config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> > >             config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
> > >
> >  config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
> > >
> > >  PulsarSource<String> pulsarSource = PulsarSource.builder()
> > >                     .setServiceUrl("serviceurl")
> > >                     .setAdminUrl("adminurl")
> > >                     .setStartCursor(StartCursor.earliest())
> > >                     .setTopics("topicname")
> > >                     .setDeserializationSchema(new SimpleStringSchema())
> > >                     .setSubscriptionName("test-sub")
> > >                     .setConfig(config)
> > >                     .build();
> > >
> > >
> > > pulsarStream.map(new MapFunction<String, String>() {
> > >                 private static final long serialVersionUID =
> > -999736771747691234L;
> > >
> > >                 public String map(String value) throws Exception {
> > >                   return "Receiving from Pulsar : " + value;
> > >                 }
> > >               }).print();
> > >
> > >
> > >             envn.execute();
> > >
> > >
> > > As per documentation i did not find any inbuilt method in the
> > PulsarSource
> > > class to pass the TLS certs, i tried using the PulsarClient options as
> > > config and pass it to PulsarSource as option.
> > >
> > > This doesn't seem to work, as when i try to deploy the app, the Flink
> job
> > > is submitted and JobManager throws the below error.
> > >
> > > Caused by: sun.security.validator.ValidatorException: PKIX path
> building
> > failed: sun.security.provider.certpath.SunCertPathBuilderException:
> unable
> > to find valid certification path to requested target
> > >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> > ~[?:?]
> > >     at sun.security.validator.PKIXValidator.engineValidate(Unknown
> > Source) ~[?:?]
> > >     at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> > >     at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source)
> > ~[?:?]
> > >
> > >
> > > Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> > unable to find valid certification path to requested target
> > >     at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
> > Source) ~[?:?]
> > >     at
> > sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
> > Source) ~[?:?]
> > >     at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> > >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> > ~[?:?]
> > >
> > > I have already verified the certs path and it is correct, also i am
> using
> > > the same path as a volume mount for my other apps and they work fine.
> > >
> > > My question is :
> > >
> > > How i can pass the certs to the latest version of the
> > > *flink-connector-pulsar* i.e *4.0.0-1.17*
> > >
> >
>

Reply via email to