Hi, Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1]
[1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl Best, Weihua On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bdkan...@gmail.com> wrote: > Can someone please help with this? > > On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bdkan...@gmail.com> wrote: > > > I am trying to connect my Flink application to a Pulsar topic for > > ingesting data. The topic is active and i am able to ingest the data via > a > > normal Java application. > > > > When i try to use the Flink application to ingest the data from the same > > topic, using the latest version of flink-connector-pulsar i.e > 4.0.0-1.17, i > > do not find in the documenation anywhere how to pass to pass the TLS > certs. > > > > I tried with below code: > > > > > > final StreamExecutionEnvironment envn = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > Configuration config = new Configuration(); > > > > config.setString("pulsar.client.authentication","tls"); > > > config.setString("pulsar.client.tlsCertificateFilePath",tlsCert); > > config.setString("pulsar.client.tlsKeyFilePath",tlsKey); > > > config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert); > > > > PulsarSource<String> pulsarSource = PulsarSource.builder() > > .setServiceUrl("serviceurl") > > .setAdminUrl("adminurl") > > .setStartCursor(StartCursor.earliest()) > > .setTopics("topicname") > > .setDeserializationSchema(new SimpleStringSchema()) > > .setSubscriptionName("test-sub") > > .setConfig(config) > > .build(); > > > > > > pulsarStream.map(new MapFunction<String, String>() { > > private static final long serialVersionUID = > -999736771747691234L; > > > > public String map(String value) throws Exception { > > return "Receiving from Pulsar : " + value; > > } > > }).print(); > > > > > > envn.execute(); > > > > > > As per documentation i did not find any inbuilt method in the > PulsarSource > > class to pass the TLS certs, i tried using the PulsarClient options as > > config and pass it to PulsarSource as option. > > > > This doesn't seem to work, as when i try to deploy the app, the Flink job > > is submitted and JobManager throws the below error. > > > > Caused by: sun.security.validator.ValidatorException: PKIX path building > failed: sun.security.provider.certpath.SunCertPathBuilderException: unable > to find valid certification path to requested target > > at sun.security.validator.PKIXValidator.doBuild(Unknown Source) > ~[?:?] > > at sun.security.validator.PKIXValidator.engineValidate(Unknown > Source) ~[?:?] > > at sun.security.validator.Validator.validate(Unknown Source) ~[?:?] > > at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) > ~[?:?] > > > > > > Caused by: sun.security.provider.certpath.SunCertPathBuilderException: > unable to find valid certification path to requested target > > at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown > Source) ~[?:?] > > at > sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown > Source) ~[?:?] > > at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?] > > at sun.security.validator.PKIXValidator.doBuild(Unknown Source) > ~[?:?] > > > > I have already verified the certs path and it is correct, also i am using > > the same path as a volume mount for my other apps and they work fine. > > > > My question is : > > > > How i can pass the certs to the latest version of the > > *flink-connector-pulsar* i.e *4.0.0-1.17* > > >