Can someone please help with this?
On Mon, 15 May, 2023, 09:34 Bauddhik Anand, <[email protected]> wrote:
> I am trying to connect my Flink application to a Pulsar topic for
> ingesting data. The topic is active and i am able to ingest the data via a
> normal Java application.
>
> When i try to use the Flink application to ingest the data from the same
> topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
> do not find in the documenation anywhere how to pass to pass the TLS certs.
>
> I tried with below code:
>
>
> final StreamExecutionEnvironment envn =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Configuration config = new Configuration();
>
> config.setString("pulsar.client.authentication","tls");
> config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
> config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
>
> config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
>
> PulsarSource<String> pulsarSource = PulsarSource.builder()
> .setServiceUrl("serviceurl")
> .setAdminUrl("adminurl")
> .setStartCursor(StartCursor.earliest())
> .setTopics("topicname")
> .setDeserializationSchema(new SimpleStringSchema())
> .setSubscriptionName("test-sub")
> .setConfig(config)
> .build();
>
>
> pulsarStream.map(new MapFunction<String, String>() {
> private static final long serialVersionUID =
> -999736771747691234L;
>
> public String map(String value) throws Exception {
> return "Receiving from Pulsar : " + value;
> }
> }).print();
>
>
> envn.execute();
>
>
> As per documentation i did not find any inbuilt method in the PulsarSource
> class to pass the TLS certs, i tried using the PulsarClient options as
> config and pass it to PulsarSource as option.
>
> This doesn't seem to work, as when i try to deploy the app, the Flink job
> is submitted and JobManager throws the below error.
>
> Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to
> find valid certification path to requested target
> at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
> at sun.security.validator.PKIXValidator.engineValidate(Unknown Source)
> ~[?:?]
> at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
> at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]
>
>
> Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
> Source) ~[?:?]
> at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
> Source) ~[?:?]
> at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
> at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
>
> I have already verified the certs path and it is correct, also i am using
> the same path as a volume mount for my other apps and they work fine.
>
> My question is :
>
> How i can pass the certs to the latest version of the
> *flink-connector-pulsar* i.e *4.0.0-1.17*
>