I am trying to connect my Flink application to a Pulsar topic for ingesting
data. The topic is active and i am able to ingest the data via a normal
Java application.
When i try to use the Flink application to ingest the data from the same
topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
do not find in the documenation anywhere how to pass to pass the TLS certs.
I tried with below code:
final StreamExecutionEnvironment envn =
StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.setString("pulsar.client.authentication","tls");
config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
config.setString("pulsar.client.tlsKeyFilePath",tlsKey);
config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);
PulsarSource<String> pulsarSource = PulsarSource.builder()
.setServiceUrl("serviceurl")
.setAdminUrl("adminurl")
.setStartCursor(StartCursor.earliest())
.setTopics("topicname")
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName("test-sub")
.setConfig(config)
.build();
pulsarStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID =
-999736771747691234L;
public String map(String value) throws Exception {
return "Receiving from Pulsar : " + value;
}
}).print();
envn.execute();
As per documentation i did not find any inbuilt method in the PulsarSource
class to pass the TLS certs, i tried using the PulsarClient options as
config and pass it to PulsarSource as option.
This doesn't seem to work, as when i try to deploy the app, the Flink job
is submitted and JobManager throws the below error.
Caused by: sun.security.validator.ValidatorException: PKIX path
building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
at sun.security.validator.PKIXValidator.engineValidate(Unknown
Source) ~[?:?]
at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]
Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
Source) ~[?:?]
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
Source) ~[?:?]
at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
I have already verified the certs path and it is correct, also i am using
the same path as a volume mount for my other apps and they work fine.
My question is :
How i can pass the certs to the latest version of the
*flink-connector-pulsar* i.e *4.0.0-1.17*