Re,

Thank you very much for your help! I wandered around for several days. It
worked by adding to a clickhouse connector creation http request:





*              "consumer.override.ssl.truststore.location":
"/opt/kafka/kafka.ca.pem",
"consumer.override.ssl.truststore.type": "PEM",
"consumer.override.ssl.keystore.type": "PEM",
"consumer.override.ssl.keystore.location":
"/opt/kafka/kafka.client.p8.pem",
"consumer.override.ssl.client.auth": "required"      *

/Domantas



On Wed, Jul 23, 2025 at 4:59 PM Chris Egerton <fearthecel...@gmail.com>
wrote:

> Hi Domantas,
>
> For sink connectors, you'll need to add all SSL-related properties either
> to your Connect worker file prefixed with "consumer.", or to your
> individual connector files prefixed with "consumer.override.".
>
> If you're using the DLQ feature, you'll also need to do the same but with
> "admin." (worker file) or "admin.override." (connector files).
>
> And, for source connectors, do the same but with "producer." (worker file)
> / "producer.override." (connector files).
>
> Cheers,
>
> Chris
>
> On Wed, Jul 23, 2025, 08:31 Domantas Spečiūnas
> <domantas.speciunas@aardvark.technology.invalid> wrote:
>
> > HI,
> >
> > I have issue connecting to to kafka with SSL, tried a lot of options and
> > stuck, maybe someone can suggest what is wrong here, on clickhouse side
> > everything is fine:
> >
> > [root@server.tld01 config]# cat connect-distributed.properties  | grep
> -v
> > ^# | grep -v '^$'
> > group.id=connect-cluster
> > key.converter=org.apache.kafka.connect.json.JsonConverter
> > value.converter=org.apache.kafka.connect.json.JsonConverter
> > key.converter.schemas.enable=true
> > value.converter.schemas.enable=true
> > offset.storage.topic=connect-offsets
> > offset.storage.replication.factor=1
> > config.storage.topic=connect-configs
> > config.storage.replication.factor=1
> > status.storage.topic=connect-status
> > status.storage.replication.factor=1
> > offset.flush.interval.ms=10000
> > listeners=HTTP://127.0.0.1:8083
> > plugin.path=/opt/kafka/connectors
> > ssl.keystore.location=/opt/kafka.client.p8.pem
> > ssl.keystore.type=PEM
> > bootstrap.servers=server.tld01:9091
> > security.protocol=SSL
> > ssl.truststore.type=PEM
> > ssl.truststore.location=/opt/kafka/kafka.ca.pem
> > ssl.client.auth=required
> >
> >
> >
> > [root@server.tld01  config]# cat server.properties  | grep -v ^# | grep
> -v
> > '^$'
> > process.roles=broker,controller
> > node.id=281724871
> > controller.quorum.voters=131171308@server.tld01
> > :9093,281724871@server.tld02
> > :9093,8884189@server.tld03:9093
> > listeners=BROKER://:9091,BROKERSASL://:9092,CONTROLLER://:9093
> > inter.broker.listener.name=BROKER
> > sasl.enabled.mechanisms=SCRAM-SHA-512
> > controller.listener.names=CONTROLLER
> >
> >
> listener.security.protocol.map=BROKER:SSL,BROKERSASL:SASL_SSL,CONTROLLER:SSL
> > authorizer.class.name
> > =org.apache.kafka.metadata.authorizer.StandardAuthorizer
> > super.users=User:CN=server;User:CN=client
> > listener.name.broker.ssl.keystore.type=PEM
> > listener.name.broker.ssl.keystore.location=/opt/kafka/kafka.server.p8.pem
> > listener.name.broker.ssl.truststore.type=PEM
> > listener.name.broker.ssl.truststore.location=/opt/kafka/kafka.ca.pem
> > listener.name.broker.ssl.client.auth=required
> > listener.name.brokersasl.ssl.keystore.type=PEM
> >
> >
> listener.name.brokersasl.ssl.keystore.location=/opt/kafka/kafka.server.p8.pem
> > listener.name.brokersasl.ssl.client.auth=none
> >
> >
> listener.name.brokersasl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
> > required;
> > listener.name.controller.ssl.keystore.type=PEM
> >
> >
> listener.name.controller.ssl.keystore.location=/opt/kafka/kafka.server.p8.pem
> > listener.name.controller.ssl.truststore.type=PEM
> > listener.name.controller.ssl.truststore.location=/opt/kafka/kafka.ca.pem
> > listener.name.controller.ssl.client.auth=required
> > num.network.threads=3
> > num.io.threads=8
> > socket.send.buffer.bytes=102400
> > socket.receive.buffer.bytes=102400
> > socket.request.max.bytes=104857600
> > log.dirs=/var/lib/kafka-logs
> > num.partitions=3
> > default.replication.factor=3
> > auto.create.topics.enable=true
> > min.insync.replicas=2
> > num.recovery.threads.per.data.dir=1
> > offsets.topic.replication.factor=3
> > share.coordinator.state.topic.replication.factor=3
> > share.coordinator.state.topic.min.isr=2
> > transaction.state.log.replication.factor=3
> > transaction.state.log.min.isr=2
> > log.retention.hours=168
> > log.segment.bytes=1073741824
> > log.retention.check.interval.ms=300000
> >
> >
> >
> >   curl -X GET http://localhost:8083/connectors
> >         curl -X POST http://localhost:8083/connectors   -H
> "Content-Type:
> > application/json"   -d '{
> >             "name": "clickhouse-sink-connector",
> >             "config": {
> >               "connector.class":
> > "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
> >               "tasks.max": "1",
> >               "consumer.override.max.poll.records": "5000",
> >               "consumer.override.max.partition.fetch.bytes": "5242880",
> >               "errors.retry.timeout": "60",
> >               "exactlyOnce": "false",
> >               "hostname": "clickhouse-db01.tld",
> >               "security.protocol": "SSL",
> >               "ssl": true,
> >               "ssl.truststore.location": "/opt/kafka/clickhouse.ca.pem",
> >               "ssl.truststore.type": "PEM",
> >               "port": "8443",
> >               "topics":
> > "ticketTransactions.aggregator_dev.ticketTransactions",
> >               "username":"clickhouse-kafka",
> >               "password":"PASSS",
> >               "database":"DB",
> >               "value.converter":
> > "org.apache.kafka.connect.json.JsonConverter",
> >               "value.converter.schemas.enable": "false",
> >               "key.converter":
> > "org.apache.kafka.connect.json.JsonConverter",
> >               "key.converter.schemas.enable": "false",
> >               "errors.log.enable": "true",
> >               "errors.log.include.messages": "true"
> >             }
> >           }'
> >
> >
> > server.log multiple lines:
> > [2025-07-23 11:06:26,535] INFO [SocketServer listenerType=BROKER,
> > nodeId=281724871] Failed authentication with /127.0.0.1
> > (channelId=127.0.0.1:9091-127.0.0.1:52730-1-16786) (SSL handshake
> failed)
> > (org.apache.kafka.common.network.Selector)
> >
> >
> > connect.log multiple lines:
> >
> > [2025-07-23 11:12:49,230] WARN [clickhouse-sink-connector|task-0]
> [Consumer
> > clientId=connector-consumer-clickhouse-sink-connector-0,
> > groupId=connect-clickhouse-sink-connector] Bootstrap broker
> 127.0.0.1:9091
> > (id: -1 rack: null isFenced: false) disconnected
> > (org.apache.kafka.clients.NetworkClient:1255)
> > [2025-07-23 11:12:50,184] INFO [clickhouse-sink-connector|task-0]
> [Consumer
> > clientId=connector-consumer-clickhouse-sink-connector-0,
> > groupId=connect-clickhouse-sink-connector] Rebootstrapping with [/
> > 127.0.0.1:9091] (org.apache.kafka.clients.Metadata:314)
> > [2025-07-23 11:12:51,032] INFO [clickhouse-sink-connector|task-0]
> [Consumer
> > clientId=connector-consumer-clickhouse-sink-connector-0,
> > groupId=connect-clickhouse-sink-connector] Node -1 disconnected.
> > (org.apache.kafka.clients.NetworkClient:1072)
> > [2025-07-23 11:12:51,032] INFO [clickhouse-sink-connector|task-0]
> [Consumer
> > clientId=connector-consumer-clickhouse-sink-connector-0,
> > groupId=connect-clickhouse-sink-connector] Cancelled in-flight
> API_VERSIONS
> > request with correlation id 4490 due to node -1 being disconnected
> (elapsed
> > time since creation: 30ms, elapsed time since send: 30ms, throttle time:
> > 0ms, request timeout: 30000ms)
> (org.apache.kafka.clients.NetworkClient:411)
> >
> >
> > kafka DEBUG says that kafka-connect is trying to connect without SSL:
> >
> > Jul 22 14:49:27 dkurmis kafka[3579826]: )
> > Jul 22 14:49:27 dkurmis kafka[3579826]:
> >
> >
> javax.net.ssl|DEBUG|73|data-plane-kafka-network-thread-281724871-ListenerName(BROKER)-SSL-1|2025-07-22
> > 14:49:27.530 UTC|CertificateMessage.java:1172|Consuming client
> Certificate
> > handshake message (
> > Jul 22 14:49:27 dkurmis kafka[3579826]: "Certificate": {
> > Jul 22 14:49:27 dkurmis kafka[3579826]:   "certificate_request_context":
> > "",
> > Jul 22 14:49:27 dkurmis kafka[3579826]:   "certificate_list": [
> > Jul 22 14:49:27 dkurmis kafka[3579826]: ]
> > Jul 22 14:49:27 dkurmis kafka[3579826]: }
> > Jul 22 14:49:27 dkurmis kafka[3579826]: )
> > Jul 22 14:49:27 dkurmis kafka[3579826]:
> >
> >
> javax.net.ssl|ERROR|73|data-plane-kafka-network-thread-281724871-ListenerName(BROKER)-SSL-1|2025-07-22
> > 14:49:27.530 UTC|TransportContext.java:375|Fatal (BAD_CERTIFICATE): Empty
> > client certificate chain (
> > Jul 22 14:49:27 dkurmis kafka[3579826]: "throwable" : {
> > Jul 22 14:49:27 dkurmis kafka[3579826]:
> > javax.net.ssl.SSLHandshakeException: Empty client certificate chain
> >
>

Reply via email to