Can someone please help with this? On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bdkan...@gmail.com> wrote:
> I am trying to connect my Flink application to a Pulsar topic for > ingesting data. The topic is active and i am able to ingest the data via a > normal Java application. > > When i try to use the Flink application to ingest the data from the same > topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i > do not find in the documenation anywhere how to pass to pass the TLS certs. > > I tried with below code: > > > final StreamExecutionEnvironment envn = > StreamExecutionEnvironment.getExecutionEnvironment(); > > Configuration config = new Configuration(); > > config.setString("pulsar.client.authentication","tls"); > config.setString("pulsar.client.tlsCertificateFilePath",tlsCert); > config.setString("pulsar.client.tlsKeyFilePath",tlsKey); > > config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert); > > PulsarSource<String> pulsarSource = PulsarSource.builder() > .setServiceUrl("serviceurl") > .setAdminUrl("adminurl") > .setStartCursor(StartCursor.earliest()) > .setTopics("topicname") > .setDeserializationSchema(new SimpleStringSchema()) > .setSubscriptionName("test-sub") > .setConfig(config) > .build(); > > > pulsarStream.map(new MapFunction<String, String>() { > private static final long serialVersionUID = > -999736771747691234L; > > public String map(String value) throws Exception { > return "Receiving from Pulsar : " + value; > } > }).print(); > > > envn.execute(); > > > As per documentation i did not find any inbuilt method in the PulsarSource > class to pass the TLS certs, i tried using the PulsarClient options as > config and pass it to PulsarSource as option. > > This doesn't seem to work, as when i try to deploy the app, the Flink job > is submitted and JobManager throws the below error. > > Caused by: sun.security.validator.ValidatorException: PKIX path building > failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to > find valid certification path to requested target > at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?] > at sun.security.validator.PKIXValidator.engineValidate(Unknown Source) > ~[?:?] > at sun.security.validator.Validator.validate(Unknown Source) ~[?:?] > at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?] > > > Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable > to find valid certification path to requested target > at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown > Source) ~[?:?] > at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown > Source) ~[?:?] > at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?] > at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?] > > I have already verified the certs path and it is correct, also i am using > the same path as a volume mount for my other apps and they work fine. > > My question is : > > How i can pass the certs to the latest version of the > *flink-connector-pulsar* i.e *4.0.0-1.17* >