Kafka checks the validity of SSL certificates keystore or trust store?

2021-08-12 Thread Deepak Jain
Hello,

We are using Kafka for data uploading via SSL. While doing the SSL certificate 
expiry test, we found that Kafka checks the expiry of keystore and does not 
start when the current date exceed the validity end date of keystore and dump 
the following exception in server.log

START-OF-EXCEPTION-
[2021-10-08 20:01:39,731] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.config.ConfigException: Invalid value 
javax.net.ssl.SSLHandshakeException: PKIX path validation failed: 
java.security.cert.CertPathValidatorException: validity check failed for 
configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings.
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
at kafka.network.Processor.(SocketServer.scala:853)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:442)
at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259)
at kafka.network.SocketServer.startup(SocketServer.scala:131)
at kafka.server.KafkaServer.startup(KafkaServer.scala:285)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
javax.net.ssl.SSLHandshakeException: PKIX path validation failed: 
java.security.cert.CertPathValidatorException: validity check failed for 
configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings.
at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:100)
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:180)
... 17 more
END-OF-EXCEPTION-

Please verify whether our assumption is correct or not.
If yes, let us know whether the truststore expiry is taken into account or not.
If no, then let us know the correct behavior of kafka SSL certificate expiry 
checks.

Also, let us know whether the exclusion of host (Server) certificate received 
from CA form the Certificate chain while generating the trust store has any 
impact on the expiry date of resultant trust store.

Regards,
Deepak


AW: Kafka 2.8.0 "KRaft" - advertised.listeners port mismatch?

2021-08-12 Thread c...@uweeisele.eu
Hello Mike,

this is a bug which I also have already noticed. This has already been fixed 
(https://github.com/apache/kafka/pull/10935) and it will be released with Kafka 
3.0.0 (https://issues.apache.org/jira/browse/KAFKA-13003).
[https://opengraph.githubassets.com/0ca691b1fbe7df32144cb42a8cca70a806b01d3faa4d8a533771acd38bba100f/apache/kafka/pull/10935]
KAFKA-13003: In kraft mode also advertise configured advertised port instead of 
socket port by ueisele · Pull Request #10935 · 
apache/kafka
In Kraft mode Apache Kafka 2.8.0 does advertise the socket port instead of the 
configured advertised port. A broker given with the following configuration 
listeners=PUBLIC://0.0.0.0:19092,REPLICATI...
github.com
I created a Docker Image of Kafka 3.0.0 SNAPSHOT with the fix included 
(https://hub.docker.com/layers/154845427/ueisele/apache-kafka-server/3.0.0-SNAPSHOT/images/sha256-7f5f240062e4c788347cff0f674ae3c546deb89123db3b72715fa31c83445e8b?context=repo).
 Here is an example: 
https://github.com/ueisele/kafka/blob/fix/kraft-advertisedlisteners-build/proxy-examples/proxyl4-kafkakraft-fix-trunk/docker-compose.yaml
[https://opengraph.githubassets.com/1c158e3dc860f2f3f4e1eec56799a7e62d6648137c72a1170272bb9d7f0818fe/ueisele/kafka]
kafka/docker-compose.yaml at fix/kraft-advertisedlisteners-build · 
ueisele/kafka
Mirror of Apache Kafka. Contribute to ueisele/kafka development by creating an 
account on GitHub.
github.com
I hope this helps.

Best Regards,
Uwe

Von: Mike Pontillo 
Gesendet: Donnerstag, 12. August 2021 03:53
An: users@kafka.apache.org 
Betreff: Kafka 2.8.0 "KRaft" - advertised.listeners port mismatch?

Greetings,

   I'm trying to create a container (suitable for testing purposes) that
runs a single-instance (and "zookeeper free") Kafka instance using the
"KRaft" functionality in 2.8.0.

   In the configuration file, I have configured the listeners as follows:

listeners=CONTROLLER://127.0.0.1:9093,INTERNAL://127.0.0.1:9094,EXTERNAL://
0.0.0.0:9092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://127.0.0.1:9094,EXTERNAL://127.0.0.1:55032
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

   Then I forward a random port to port 9092 inside the container. However,
when I connect to the container from the outside, the client sees an
advertised port of 9092, which is only configured in the "listeners" key:

$ kafkacat -L -b 127.0.0.1:55033
Metadata for all topics (from broker -1: 127.0.0.1:55033/bootstrap):
 1 brokers:
  broker 1 at 127.0.0.1:9092 (controller)
 0 topics:

   Can anyone spot anything I'm doing that is obviously incorrect? Is this
a possible bug? (If there is interest, I can also share more details around
how this container is being built.)

Regards,
Mike


Kafka Streams leave group behaviour

2021-08-12 Thread c...@uweeisele.eu
Hello all,

I have a question about the Group Membership lifecycle of Kafka Streams, or 
more specific about when Kafka Streams does leave the consumer group (in case 
of dynamic membership).

My expectation was, that a call to the method KafkaStreams.close() also sends a 
LeaveGroup request to the coordination (if dynamic membership is used). 
However, its seems that this is not the case (at least in my case the request 
was not send). Only if I explicitly call KafkaStreams.removeStreamThread() a 
LeaveGroup request is sent to the coordinator. I used the WordCount example 
located in https://github.com/confluentinc/kafka-streams-examples to evaluate 
this.

Is this how Kafka Streams is intended to work and if yes, what do you recommend 
to achieve that Kafka Streams leaves the group when shutting down the 
application? For example, one situation where I don't want to wait for the 
session timeout is when downscaling an application.

Thanks.

Best Regards,
Uwe

RE: [EXTERNAL]Re: KSQLdb Stream, Getting Topic Key

2021-08-12 Thread Greer, Andrew C
This worked for me, thank you for the help!

Andrew Greer

-Original Message-
From: Daniel Hinojosa  
Sent: Tuesday, August 10, 2021 2:33 PM
To: users@kafka.apache.org
Subject: [EXTERNAL]Re: KSQLdb Stream, Getting Topic Key

CAUTION:This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



I had to verify myself, I wrote an example that uses a key that is not in the 
payload and I did where I specify the KEY and now it is a field. Notice the 
state down below

CREATE STREAM my_avro_orders (total BIGINT, shipping VARCHAR, state VARCHAR 
KEY, discount DOUBLE, gender VARCHAR)  WITH (kafka_topic='my_avro_orders', 
value_format='AVRO');

This came out as the following with a describe:

ksql> describe my_avro_orders;

Name : MY_AVRO_ORDERS
 Field| Type
---
 TOTAL| BIGINT
 SHIPPING | VARCHAR(STRING)
 STATE| VARCHAR(STRING)  (key)
 DISCOUNT | DOUBLE
 GENDER   | VARCHAR(STRING)
---

Then querying it, I get:

ksql> select * from my_avro_orders emit changes;
+-+-+-+-+-+
|STATE|TOTAL|SHIPPING |DISCOUNT
|GENDER   |
+-+-+-+-+-+
|ME   |48025|TWO_DAY
 |0.01999552965164 |FEMALE   |
|MD   |32813|NEXT_DAY
|0.1099940395355  |FEMALE   |
|SC   |99302|TWO_DAY
 |0.0900357627869  |MALE |
|SD   |35621|TWO_DAY
 |0.1700178813934  |FEMALE   |
|SD   |5698 |NEXT_DAY
|0.1299523162842  |MALE |
|VT   |8567 |NEXT_DAY
|0.0500074505806  |FEMALE   |
|WA   |13737|NEXT_DAY
|0.0799821186066  |MALE |
|NH   |60216|TWO_DAY
 |0.0500074505806  |MALE |
|UT   |47470|TWO_DAY
 |0.0500074505806  |MALE |
|IN   |69220|TWO_DAY
 |0.189976158142   |FEMALE   |


On Tue, Aug 10, 2021 at 12:11 PM Daniel Hinojosa < dhinoj...@evolutionnext.com> 
wrote:

> Ah, nevermind, just saw the title that this is KSQLDB.
>
> On Tue, Aug 10, 2021 at 12:09 PM Daniel Hinojosa < 
> dhinoj...@evolutionnext.com> wrote:
>
>> The keys are already part of the stream. When you run builder.stream 
>> or builder.table it returns a Stream or a Table. From there 
>> every operation has a lambda that accepts both key and value. You can 
>> use map for example to accept the key and do something with that. Let 
>> me know if you have any other questions or if I didn't understand correctly.
>>
>> On Tue, Aug 10, 2021 at 10:58 AM Greer, Andrew C < 
>> andrew.c.gr...@conocophillips.com> wrote:
>>
>>> Hello, I am trying to create a Stream that will accept the data from 
>>> my topic and be able to use the message keys in the stream as unique 
>>> identifiers for the sensors the data originated from. The data in 
>>> the messages does not have anything that would be able to identify 
>>> which sensor it came from, hence trying to get the keys used in the 
>>> topic. I have seen various posts online about a ROWKEY column, but 
>>> that doesn't appear to be an option on my version at least. I have 
>>> had no success finding a way to incorporate the keys into my Stream 
>>> and was hoping I would be able to find some help here.
>>>
>>> Thank you.
>>>
>>> Kafka version 6.0.1
>>>
>>> Andrew
>>>
>>>


Re: Kafka Streams leave group behaviour

2021-08-12 Thread Lerh Chuan Low
I think you may have stumbled upon this:
https://issues.apache.org/jira/browse/KAFKA-4881. 1 thing that you could
try is using static membership - we have yet to try that though so can't
comment yet on how that might work out.

On Thu, Aug 12, 2021 at 11:29 PM c...@uweeisele.eu 
wrote:

> Hello all,
>
> I have a question about the Group Membership lifecycle of Kafka Streams,
> or more specific about when Kafka Streams does leave the consumer group (in
> case of dynamic membership).
>
> My expectation was, that a call to the method KafkaStreams.close() also
> sends a LeaveGroup request to the coordination (if dynamic membership is
> used). However, its seems that this is not the case (at least in my case
> the request was not send). Only if I explicitly call
> KafkaStreams.removeStreamThread() a LeaveGroup request is sent to the
> coordinator. I used the WordCount example located in
> https://github.com/confluentinc/kafka-streams-examples to evaluate this.
>
> Is this how Kafka Streams is intended to work and if yes, what do you
> recommend to achieve that Kafka Streams leaves the group when shutting down
> the application? For example, one situation where I don't want to wait for
> the session timeout is when downscaling an application.
>
> Thanks.
>
> Best Regards,
> Uwe


Re: Kafka Streams leave group behaviour

2021-08-12 Thread Boyang Chen
You are right Uwe, Kafka Streams won't leave group no matter dynamic or
static membership. If you want to have fast scale down, consider trying
static membership and use the admin command `removeMemberFromGroup` when
you need to rescale.

Boyang

On Thu, Aug 12, 2021 at 4:37 PM Lerh Chuan Low  wrote:

> I think you may have stumbled upon this:
> https://issues.apache.org/jira/browse/KAFKA-4881. 1 thing that you could
> try is using static membership - we have yet to try that though so can't
> comment yet on how that might work out.
>
> On Thu, Aug 12, 2021 at 11:29 PM c...@uweeisele.eu 
> wrote:
>
> > Hello all,
> >
> > I have a question about the Group Membership lifecycle of Kafka Streams,
> > or more specific about when Kafka Streams does leave the consumer group
> (in
> > case of dynamic membership).
> >
> > My expectation was, that a call to the method KafkaStreams.close() also
> > sends a LeaveGroup request to the coordination (if dynamic membership is
> > used). However, its seems that this is not the case (at least in my case
> > the request was not send). Only if I explicitly call
> > KafkaStreams.removeStreamThread() a LeaveGroup request is sent to the
> > coordinator. I used the WordCount example located in
> > https://github.com/confluentinc/kafka-streams-examples to evaluate this.
> >
> > Is this how Kafka Streams is intended to work and if yes, what do you
> > recommend to achieve that Kafka Streams leaves the group when shutting
> down
> > the application? For example, one situation where I don't want to wait
> for
> > the session timeout is when downscaling an application.
> >
> > Thanks.
> >
> > Best Regards,
> > Uwe
>


Re: Kafka 2.8.0 "KRaft" - advertised.listeners port mismatch?

2021-08-12 Thread Mike Pontillo
On Thu, Aug 12, 2021 at 11:44 AM c...@uweeisele.eu 
wrote:

> this is a bug which I also have already noticed. This has already been
> fixed (https://github.com/apache/kafka/pull/10935) and it will be
> released with Kafka 3.0.0 (
> https://issues.apache.org/jira/browse/KAFKA-13003).


Thank you for the reply (and for contributing the fix)!

This may be a better question for the development list, but I wonder if
this fix would be a candidate for inclusion in a 2.8.1 release, and (if so)
what the process would be to backport it.

Regards,
Mike