I am trying to connect my Flink application to a Pulsar topic for ingesting
data. The topic is active and i am able to ingest the data via a normal
Java application.

When i try to use the Flink application to ingest the data from the same
topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
do not find in the documenation anywhere how to pass to pass the TLS certs.

I tried with below code:


final StreamExecutionEnvironment envn =
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration config = new Configuration();

            config.setString("pulsar.client.authentication","tls");
            config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
            config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
            
config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);

 PulsarSource<String> pulsarSource = PulsarSource.builder()
                    .setServiceUrl("serviceurl")
                    .setAdminUrl("adminurl")
                    .setStartCursor(StartCursor.earliest())
                    .setTopics("topicname")
                    .setDeserializationSchema(new SimpleStringSchema())
                    .setSubscriptionName("test-sub")
                    .setConfig(config)
                    .build();


pulsarStream.map(new MapFunction<String, String>() {
                private static final long serialVersionUID =
-999736771747691234L;

                public String map(String value) throws Exception {
                  return "Receiving from Pulsar : " + value;
                }
              }).print();


            envn.execute();


As per documentation i did not find any inbuilt method in the PulsarSource
class to pass the TLS certs, i tried using the PulsarClient options as
config and pass it to PulsarSource as option.

This doesn't seem to work, as when i try to deploy the app, the Flink job
is submitted and JobManager throws the below error.

Caused by: sun.security.validator.ValidatorException: PKIX path
building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target
    at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
    at sun.security.validator.PKIXValidator.engineValidate(Unknown
Source) ~[?:?]
    at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]


Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target
    at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
Source) ~[?:?]
    at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
Source) ~[?:?]
    at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
    at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]

I have already verified the certs path and it is correct, also i am using
the same path as a volume mount for my other apps and they work fine.

My question is :

How i can pass the certs to the latest version of the
*flink-connector-pulsar* i.e *4.0.0-1.17*

Reply via email to