Thanks for your response. Yes service url, admin url, topic etc are correct and have correct prefix as well.
It is working with normal java application. I am not sure how i can pass the TLS certs. On Tue, 16 May, 2023, 14:52 Weihua Hu, <huweihua....@gmail.com> wrote: > Hi, > > Did you try set 'serviceurl' starts with "pulsar+ssl://"? [1] > > [1] > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#pulsar-client-serviceurl > > Best, > Weihua > > > On Mon, May 15, 2023 at 7:22 PM Bauddhik Anand <bdkan...@gmail.com> wrote: > > > Can someone please help with this? > > > > On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bdkan...@gmail.com> wrote: > > > > > I am trying to connect my Flink application to a Pulsar topic for > > > ingesting data. The topic is active and i am able to ingest the data > via > > a > > > normal Java application. > > > > > > When i try to use the Flink application to ingest the data from the > same > > > topic, using the latest version of flink-connector-pulsar i.e > > 4.0.0-1.17, i > > > do not find in the documenation anywhere how to pass to pass the TLS > > certs. > > > > > > I tried with below code: > > > > > > > > > final StreamExecutionEnvironment envn = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > Configuration config = new Configuration(); > > > > > > config.setString("pulsar.client.authentication","tls"); > > > > > config.setString("pulsar.client.tlsCertificateFilePath",tlsCert); > > > config.setString("pulsar.client.tlsKeyFilePath",tlsKey); > > > > > config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert); > > > > > > PulsarSource<String> pulsarSource = PulsarSource.builder() > > > .setServiceUrl("serviceurl") > > > .setAdminUrl("adminurl") > > > .setStartCursor(StartCursor.earliest()) > > > .setTopics("topicname") > > > .setDeserializationSchema(new SimpleStringSchema()) > > > .setSubscriptionName("test-sub") > > > .setConfig(config) > > > .build(); > > > > > > > > > pulsarStream.map(new MapFunction<String, String>() { > > > private static final long serialVersionUID = > > -999736771747691234L; > > > > > > public String map(String value) throws Exception { > > > return "Receiving from Pulsar : " + value; > > > } > > > }).print(); > > > > > > > > > envn.execute(); > > > > > > > > > As per documentation i did not find any inbuilt method in the > > PulsarSource > > > class to pass the TLS certs, i tried using the PulsarClient options as > > > config and pass it to PulsarSource as option. > > > > > > This doesn't seem to work, as when i try to deploy the app, the Flink > job > > > is submitted and JobManager throws the below error. > > > > > > Caused by: sun.security.validator.ValidatorException: PKIX path > building > > failed: sun.security.provider.certpath.SunCertPathBuilderException: > unable > > to find valid certification path to requested target > > > at sun.security.validator.PKIXValidator.doBuild(Unknown Source) > > ~[?:?] > > > at sun.security.validator.PKIXValidator.engineValidate(Unknown > > Source) ~[?:?] > > > at sun.security.validator.Validator.validate(Unknown Source) ~[?:?] > > > at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) > > ~[?:?] > > > > > > > > > Caused by: sun.security.provider.certpath.SunCertPathBuilderException: > > unable to find valid certification path to requested target > > > at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown > > Source) ~[?:?] > > > at > > sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown > > Source) ~[?:?] > > > at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?] > > > at sun.security.validator.PKIXValidator.doBuild(Unknown Source) > > ~[?:?] > > > > > > I have already verified the certs path and it is correct, also i am > using > > > the same path as a volume mount for my other apps and they work fine. > > > > > > My question is : > > > > > > How i can pass the certs to the latest version of the > > > *flink-connector-pulsar* i.e *4.0.0-1.17* > > > > > >