I am trying to connect my Flink application to a Pulsar topic for ingesting data. The topic is active and i am able to ingest the data via a normal Java application.
When i try to use the Flink application to ingest the data from the same topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i do not find in the documenation anywhere how to pass to pass the TLS certs. I tried with below code: final StreamExecutionEnvironment envn = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration config = new Configuration(); config.setString("pulsar.client.authentication","tls"); config.setString("pulsar.client.tlsCertificateFilePath",tlsCert); config.setString("pulsar.client.tlsKeyFilePath",tlsKey); config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert); PulsarSource<String> pulsarSource = PulsarSource.builder() .setServiceUrl("serviceurl") .setAdminUrl("adminurl") .setStartCursor(StartCursor.earliest()) .setTopics("topicname") .setDeserializationSchema(new SimpleStringSchema()) .setSubscriptionName("test-sub") .setConfig(config) .build(); pulsarStream.map(new MapFunction<String, String>() { private static final long serialVersionUID = -999736771747691234L; public String map(String value) throws Exception { return "Receiving from Pulsar : " + value; } }).print(); envn.execute(); As per documentation i did not find any inbuilt method in the PulsarSource class to pass the TLS certs, i tried using the PulsarClient options as config and pass it to PulsarSource as option. This doesn't seem to work, as when i try to deploy the app, the Flink job is submitted and JobManager throws the below error. Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?] at sun.security.validator.PKIXValidator.engineValidate(Unknown Source) ~[?:?] at sun.security.validator.Validator.validate(Unknown Source) ~[?:?] at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?] Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown Source) ~[?:?] at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown Source) ~[?:?] at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?] at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?] I have already verified the certs path and it is correct, also i am using the same path as a volume mount for my other apps and they work fine. My question is : How i can pass the certs to the latest version of the *flink-connector-pulsar* i.e *4.0.0-1.17*