Re: Kafka Streams Handling uncaught exceptions REPLACE_THREAD
Hi Yoda, for certain cases, Kafka Streams allows you to specify handlers that skip the problematic record. Those handlers are: 1. deserialization exception handler configured in default.deserialization.exception.handler 2. time extractor set in default.timestamp.extractor and in the Consumed object 3. production exception handler configured in default.production.exception.handler Kafka Streams provides implementations for handlers 1 and 2 to skip the problematic records, that are LogAndContinueExceptionHandler and LogAndSkipOnInvalidTimestamp, respectively. For some more details have a look at https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling If problematic records cause an exception in user code, the user code needs to provide functionality to skip the problematic record. Best, Bruno On 10.08.21 13:26, Yoda Jedi Master wrote: Hi Bruno, thank you for your answer. I mean that the message that caused the exception was consumed and replaced thread will continue from the next message. How then does it handle uncaught exceptions, if it will fail again? On Tue, Aug 10, 2021 at 12:33 PM Bruno Cadonna wrote: Hi Yoda, What do you mean exactly with "skipping that failed message"? Do you mean a record consumed from a topic that caused an exception that killed the stream thread? If the record killed the stream thread due to an exception, for example, a deserialization exception, it will probably also kill the next stream thread that will read that record. Replacing a stream thread does not skip records but it can result in duplicate records depending on the application’s processing mode determined by the PROCESSING_GUARANTEE_CONFIG value as stated in the docs you cited. Best, Bruno On 10.08.21 11:15, Luke Chen wrote: Hi Yoda, For your question: If an application gets an uncaught exception, then the failed thread will be replaced with another thread and it will continue processing messages, skipping that failed message? --> Yes, if everything goes well after `replace thread`, you can ignore this failed message. Just one reminder that you should check the failed message to avoid this `uncaught exception` thrown again, because if this happens frequently, it'll impact application performance. Thank you. Luke On Tue, Aug 10, 2021 at 4:25 PM Yoda Jedi Master wrote: "REPLACE_THREAD - Replaces the thread receiving the exception and processing continues with the same number of configured threads. (Note: this can result in duplicate records depending on the application’s processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)" If an application gets an uncaught exception, then the failed thread will be replaced with another thread and it will continue processing messages, skipping that failed message?
RE: Kafka checks the validity of SSL certificates keystore or trust store?
Hello, Can anyone help me provide the below information: Kafka SSL checks the validity of which SSL certificate: keystore or trust store while checking the expiry condition? Thanks in advance! Best regards, Deepak From: Deepak Jain Sent: 12 August 2021 15:01 To: users@kafka.apache.org Cc: Alap Patwardhan ; Prashant Ahire Subject: Kafka checks the validity of SSL certificates keystore or trust store? Hello, We are using Kafka for data uploading via SSL. While doing the SSL certificate expiry test, we found that Kafka checks the expiry of keystore and does not start when the current date exceed the validity end date of keystore and dump the following exception in server.log START-OF-EXCEPTION- [2021-10-08 20:01:39,731] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.kafka.common.KafkaException: org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: PKIX path validation failed: java.security.cert.CertPathValidatorException: validity check failed for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107) at kafka.network.Processor.(SocketServer.scala:853) at kafka.network.SocketServer.newProcessor(SocketServer.scala:442) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) at scala.collection.AbstractIterable.foreach(Iterable.scala:919) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259) at kafka.network.SocketServer.startup(SocketServer.scala:131) at kafka.server.KafkaServer.startup(KafkaServer.scala:285) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala) Caused by: org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: PKIX path validation failed: java.security.cert.CertPathValidatorException: validity check failed for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:100) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:180) ... 17 more END-OF-EXCEPTION- Please verify whether our assumption is correct or not. If yes, let us know whether the truststore expiry is taken into account or not. If no, then let us know the correct behavior of kafka SSL certificate expiry checks. Also, let us know whether the exclusion of host (Server) certificate received from CA form the Certificate chain while generating the trust store has any impact on the expiry date of resultant trust store. Regards, Deepak
Secure connection using cert and keyfile
Hi, I am running this command to test my zookeeper SSL connection: openssl s_client -showcerts -connect 55.55.55.55:2280 -CAfile /certs/ca-chain.cert.pem -cert /root/ca/intermediate/certs/intermediate.cert.pem -key /root/ca/intermediate/private/intermediate.key.pem It works just fine so that's for openssl s_client to connect to zookeeper. How can I connect my Kafka server using -cert and -key option like the command mentioned above? I need to use that to avoid getting SSL errors because I cannot use ssl.clientAuth in my zookeeper config that is because I have a version 3.5.5 only (does not support ssl.clientAuth). Any ideas on how can I connect my Kafka server using -cert and -key option? Best regards, John Mark Causing