Hi,

Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl

Best,
Weihua


On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bdkan...@gmail.com> wrote:

> Can someone please help with this?
>
> On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bdkan...@gmail.com> wrote:
>
> > I am trying to connect my Flink application to a Pulsar topic for
> > ingesting data. The topic is active and i am able to ingest the data via
> a
> > normal Java application.
> >
> > When i try to use the Flink application to ingest the data from the same
> > topic, using the latest version of flink-connector-pulsar i.e
> 4.0.0-1.17, i
> > do not find in the documenation anywhere how to pass to pass the TLS
> certs.
> >
> > I tried with below code:
> >
> >
> > final StreamExecutionEnvironment envn =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > Configuration config = new Configuration();
> >
> >             config.setString("pulsar.client.authentication","tls");
> >
>  config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> >             config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
> >
>  config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
> >
> >  PulsarSource<String> pulsarSource = PulsarSource.builder()
> >                     .setServiceUrl("serviceurl")
> >                     .setAdminUrl("adminurl")
> >                     .setStartCursor(StartCursor.earliest())
> >                     .setTopics("topicname")
> >                     .setDeserializationSchema(new SimpleStringSchema())
> >                     .setSubscriptionName("test-sub")
> >                     .setConfig(config)
> >                     .build();
> >
> >
> > pulsarStream.map(new MapFunction<String, String>() {
> >                 private static final long serialVersionUID =
> -999736771747691234L;
> >
> >                 public String map(String value) throws Exception {
> >                   return "Receiving from Pulsar : " + value;
> >                 }
> >               }).print();
> >
> >
> >             envn.execute();
> >
> >
> > As per documentation i did not find any inbuilt method in the
> PulsarSource
> > class to pass the TLS certs, i tried using the PulsarClient options as
> > config and pass it to PulsarSource as option.
> >
> > This doesn't seem to work, as when i try to deploy the app, the Flink job
> > is submitted and JobManager throws the below error.
> >
> > Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> ~[?:?]
> >     at sun.security.validator.PKIXValidator.engineValidate(Unknown
> Source) ~[?:?]
> >     at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> >     at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source)
> ~[?:?]
> >
> >
> > Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> unable to find valid certification path to requested target
> >     at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
> Source) ~[?:?]
> >     at
> sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
> Source) ~[?:?]
> >     at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> >     at sun.security.validator.PKIXValidator.doBuild(Unknown Source)
> ~[?:?]
> >
> > I have already verified the certs path and it is correct, also i am using
> > the same path as a volume mount for my other apps and they work fine.
> >
> > My question is :
> >
> > How i can pass the certs to the latest version of the
> > *flink-connector-pulsar* i.e *4.0.0-1.17*
> >
>

Reply via email to