Can someone please help with this?

On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <bdkan...@gmail.com> wrote:

> I am trying to connect my Flink application to a Pulsar topic for
> ingesting data. The topic is active and i am able to ingest the data via a
> normal Java application.
>
> When i try to use the Flink application to ingest the data from the same
> topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
> do not find in the documenation anywhere how to pass to pass the TLS certs.
>
> I tried with below code:
>
>
> final StreamExecutionEnvironment envn = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Configuration config = new Configuration();
>
>             config.setString("pulsar.client.authentication","tls");
>             config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
>             config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
>             
> config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
>
>  PulsarSource<String> pulsarSource = PulsarSource.builder()
>                     .setServiceUrl("serviceurl")
>                     .setAdminUrl("adminurl")
>                     .setStartCursor(StartCursor.earliest())
>                     .setTopics("topicname")
>                     .setDeserializationSchema(new SimpleStringSchema())
>                     .setSubscriptionName("test-sub")
>                     .setConfig(config)
>                     .build();
>
>
> pulsarStream.map(new MapFunction<String, String>() {
>                 private static final long serialVersionUID = 
> -999736771747691234L;
>
>                 public String map(String value) throws Exception {
>                   return "Receiving from Pulsar : " + value;
>                 }
>               }).print();
>
>
>             envn.execute();
>
>
> As per documentation i did not find any inbuilt method in the PulsarSource
> class to pass the TLS certs, i tried using the PulsarClient options as
> config and pass it to PulsarSource as option.
>
> This doesn't seem to work, as when i try to deploy the app, the Flink job
> is submitted and JobManager throws the below error.
>
> Caused by: sun.security.validator.ValidatorException: PKIX path building 
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
> find valid certification path to requested target
>     at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
>     at sun.security.validator.PKIXValidator.engineValidate(Unknown Source) 
> ~[?:?]
>     at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
>     at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]
>
>
> Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable 
> to find valid certification path to requested target
>     at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown 
> Source) ~[?:?]
>     at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown 
> Source) ~[?:?]
>     at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
>     at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
>
> I have already verified the certs path and it is correct, also i am using
> the same path as a volume mount for my other apps and they work fine.
>
> My question is :
>
> How i can pass the certs to the latest version of the
> *flink-connector-pulsar* i.e *4.0.0-1.17*
>

Reply via email to