Re: Kafka Streams Handling uncaught exceptions REPLACE_THREAD

2021-08-16 Thread Bruno Cadonna

Hi Yoda,

for certain cases, Kafka Streams allows you to specify handlers that 
skip the problematic record. Those handlers are:


1. deserialization exception handler configured in 
default.deserialization.exception.handler
2. time extractor set in default.timestamp.extractor and in the Consumed 
object
3. production exception handler configured in 
default.production.exception.handler


Kafka Streams provides implementations for handlers 1 and 2 to skip the 
problematic records, that are LogAndContinueExceptionHandler and 
LogAndSkipOnInvalidTimestamp, respectively.


For some more details have a look at
https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling

If problematic records cause an exception in user code, the user code 
needs to provide functionality to skip the problematic record.


Best,
Bruno

On 10.08.21 13:26, Yoda Jedi Master wrote:

Hi Bruno, thank you for your answer.
I mean that the message that caused the exception was consumed and replaced
thread will continue from the next message. How then does it handle
uncaught exceptions, if it will fail again?


On Tue, Aug 10, 2021 at 12:33 PM Bruno Cadonna  wrote:


Hi Yoda,

What do you mean exactly with "skipping that failed message"?

Do you mean a record consumed from a topic that caused an exception that
killed the stream thread?

If the record killed the stream thread due to an exception, for example,
a deserialization exception, it will probably also kill the next stream
thread that will read that record. Replacing a stream thread does not
skip records but it can result in duplicate records depending on the
application’s processing mode determined by the
PROCESSING_GUARANTEE_CONFIG value as stated in the docs you cited.

Best,
Bruno



On 10.08.21 11:15, Luke Chen wrote:

Hi Yoda,
For your question:

If an application gets an uncaught exception, then the failed thread

will

be replaced with another thread and it will continue processing messages,
skipping that failed message?

--> Yes, if everything goes well after `replace thread`, you can ignore
this failed message. Just one reminder that you should check the failed
message to avoid this `uncaught exception` thrown again, because if this
happens frequently, it'll impact application performance.

Thank you.
Luke

On Tue, Aug 10, 2021 at 4:25 PM Yoda Jedi Master 

wrote:



"REPLACE_THREAD - Replaces the thread receiving the exception and
processing continues with the same number of configured threads. (Note:
this can result in duplicate records depending on the application’s
processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)"

If an application gets an uncaught exception, then the failed thread

will

be replaced with another thread and it will continue processing

messages,

skipping that failed message?









RE: Kafka checks the validity of SSL certificates keystore or trust store?

2021-08-16 Thread Deepak Jain
Hello,

Can anyone help me provide the below information:

Kafka SSL checks the validity of which SSL certificate: keystore or trust store 
while checking the expiry condition?


Thanks in advance!



Best regards,
Deepak

From: Deepak Jain
Sent: 12 August 2021 15:01
To: users@kafka.apache.org
Cc: Alap Patwardhan ; Prashant Ahire 

Subject: Kafka checks the validity of SSL certificates keystore or trust store?

Hello,

We are using Kafka for data uploading via SSL. While doing the SSL certificate 
expiry test, we found that Kafka checks the expiry of keystore and does not 
start when the current date exceed the validity end date of keystore and dump 
the following exception in server.log

START-OF-EXCEPTION-
[2021-10-08 20:01:39,731] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.config.ConfigException: Invalid value 
javax.net.ssl.SSLHandshakeException: PKIX path validation failed: 
java.security.cert.CertPathValidatorException: validity check failed for 
configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings.
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
at kafka.network.Processor.(SocketServer.scala:853)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:442)
at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259)
at kafka.network.SocketServer.startup(SocketServer.scala:131)
at kafka.server.KafkaServer.startup(KafkaServer.scala:285)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
javax.net.ssl.SSLHandshakeException: PKIX path validation failed: 
java.security.cert.CertPathValidatorException: validity check failed for 
configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings.
at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:100)
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:180)
... 17 more
END-OF-EXCEPTION-

Please verify whether our assumption is correct or not.
If yes, let us know whether the truststore expiry is taken into account or not.
If no, then let us know the correct behavior of kafka SSL certificate expiry 
checks.

Also, let us know whether the exclusion of host (Server) certificate received 
from CA form the Certificate chain while generating the trust store has any 
impact on the expiry date of resultant trust store.

Regards,
Deepak


Secure connection using cert and keyfile

2021-08-16 Thread john mark
Hi,

I am running this command to test my zookeeper SSL connection:

openssl s_client -showcerts -connect 55.55.55.55:2280 -CAfile
/certs/ca-chain.cert.pem -cert
/root/ca/intermediate/certs/intermediate.cert.pem  -key
/root/ca/intermediate/private/intermediate.key.pem

It works just fine so that's for openssl s_client to connect to zookeeper.

How can I connect my Kafka server using -cert and -key option like the
command mentioned above?

I need to use that to avoid getting SSL errors because I cannot use
ssl.clientAuth
in my zookeeper config that is because I have a version 3.5.5 only (does
not support ssl.clientAuth).

Any ideas on how can I connect my Kafka server using -cert and -key option?

Best regards,

John Mark Causing