AW: Switchin from Zookepper to Kafka KRaft mode / Using ACLs with Kafka KRaft mode

2022-05-17 Thread Florian Blumenstein
Hi Thomas,

thanks for the quick answer. I got it running by declaring the CONTROLLER 
listerner also as SSL and using the existing keystore/trustore for it.

Unfortunately I now recognize a new issue when restarting Kafka:

https://issues.apache.org/jira/browse/KAFKA-13909

Hopefully this will also be solved. Awaiting using Kafka without ZooKeeper.

Best regards,
Florian

-Ursprüngliche Nachricht-
Von: Thomas Cooper [mailto:c...@tomcooper.dev]
Gesendet: Montag, 16. Mai 2022 14:49
An: users@kafka.apache.org; Florian Blumenstein 
Betreff: Re: Switchin from Zookepper to Kafka KRaft mode / Using ACLs with 
Kafka KRaft mode

CAUTION: This email originated from outside the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.

Hi Florian,

Switching from a Zookeeper based cluster to a KRaft based one is not currently 
supported. AFAIK that functionality should be coming in Kafka
3.4 (or possibly later).

Cheers,

Tom

On 16/05/2022 12:42, Florian Blumenstein wrote:
> Hi guys,
>
> I currently try to switch from Kafka 3.1.0 with ZooKeeper to Kafka 3.2.0 with 
> Kafka Kraft mode. I adjusted the server.properties as follows:
>
> ### KRaft-properties
> process.roles=broker,controller
> node.id=1
> controller.quorum.voters=1@127.0.0.1:9091
> controller.listener.names=CONTROLLER
>
> auto.create.topics.enable=false
> ssl.client.auth=required
>
> ### Enable ACLs
> authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAut
> horizer
> allow.everyone.if.no.acl.found=false
>
> # Topics and indexes are stored here to keep track of records sent via
> broker log.dir=/opt/kafka/data/
>
> # Internal Topic Settings
> # # The replication factor for the group metadata 
> internal topics "__consumer_offsets" and "__transaction_state"
> # For anything other than development testing, a value greater than 1 is 
> recommended for to ensure availability such as 3.
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
>
> ### Platform Configured Entries --- Below here entries are configured
> by the platform
> listener.name.docker.ssl.keystore.location=/app/ssl/internalKeystore.j
> ks
> super.users=User:Applications:0765df41-0b31-4db8-8849-c9d77e9c6e20;Use
> r:CN=onlinesuiteplus-kafka,OU=Services,O=Company AG,L=City,C=DE
> advertised.listeners=DEVIN://onlinesuiteplus-kafka:29092,DEVOUT://loca
> lhost:9092,DOCKER://onlinesuiteplus-kafka:29093,EXTERNAL://localhost:9
> 093 listener.name.docker.ssl.key.password=password
> inter.broker.listener.name=DOCKER
> listener.name.external.ssl.key.password=password
> listener.name.external.ssl.truststore.password=password
> ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=Applications.*$/Applicat
> ions:$1/,RULE:^CN=(.*?),OU=Devices.*$/Devices:$1/,DEFAULT
> initial.start=true
> listener.name.docker.ssl.truststore.location=/app/ssl/truststore.jks
> listener.name.external.ssl.keystore.password=password
> listeners=CONTROLLER://:9091,DEVIN://:29092,DEVOUT://:9092,DOCKER://:2
> 9093,EXTERNAL://:9093
> listener.name.external.ssl.truststore.location=/app/ssl/truststore.jks
> listener.name.docker.ssl.truststore.password=password
> listener.name.external.ssl.keystore.location=/app/ssl/externalKeystore
> .jks
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,DEVIN:PLAINTEXT,DE
> VOUT:PLAINTEXT,DOCKER:SSL,EXTERNAL:SSL
> listener.name.docker.ssl.keystore.password=password
>
> If I now run kafka with the following script:
>
> if [ "$KAFKA_INITIAL_START" == "true" ] then
>  echo "Running kafka-storage.sh because env var KAFKA_INITIAL_START was 
> set to true"
>  "${KAFKA_HOME}"/bin/kafka-storage.sh format --config
> "${KAFKA_HOME}"/config/server.properties --cluster-id
> $("${KAFKA_HOME}"/bin/kafka-storage.sh random-uuid) fi
>
> exec "$KAFKA_HOME/bin/kafka-server-start.sh" 
> "$KAFKA_HOME/config/server.properties"
>
>
> I got the following logs:
>
> [2022-05-16 11:25:08,894] INFO Registered
> kafka:type=kafka.Log4jController MBean
> (kafka.utils.Log4jControllerRegistration$)
> [2022-05-16 11:25:09,220] INFO Setting -D
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable
> client-initiated TLS renegotiation
> (org.apache.zookeeper.common.X509Util)
> [2022-05-16 11:25:09,473] INFO [LogLoader
> partition=__cluster_metadata-0, dir=/opt/kafka/data] Loading producer
> state till offset 0 with message format version 2
> (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,474] INFO [LogLoader
> partition=__cluster_metadata-0, dir=/opt/kafka/data] Reloading from
> producer snapshot and rebuilding producer state from offset 0
> (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,477] INFO [LogLoader
> partition=__cluster_metadata-0, dir=/opt/kafka/data] Producer state
> recovery took 2ms for snapshot load and 0ms for segment recovery from
> offset 0 (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,584] INFO [raft-expirati

kafka stream - sliding window - getting unexpected output

2022-05-17 Thread Shankar Mane
Hi All,

Our use case is to use sliding window. (for e.g. at any point, whenever
> user performs any actions at time [ t1 ], we would like to see his activity
> in [ t1 - last 24 hours]. Using this, to show the user some recommendations.



-- I have code ready and it works without any errors.
-- aggregations happen as expected.
-- but the output generated is unexpected. As windows gets slides, i am
getting mixed output which includes intermediate aggregated records also
coming with final aggregated outputs.

Could someone please help me here ?  what can I do here to get ONLY final
aggregated output.


Code snippet :




builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
.filter((k, v) -> v != null)
.map((k,v) -> KeyValue.pair(v.getUserId(), v))
//.through("slidingbykey",
Produced.with(Serdes.String(), inputSerde))
.groupByKey()

.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
windowDuration))
.aggregate(OutputPojo::new, (k, tr, out) -> {
out.setUserId(tr.getUserId());
out.setCount(out.getCount() +1);
out.setSum(out.getSum() + tr.getInt4());
out.setUuid(tr.getUuid());

out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
waitForMs(200); //added delay just for analysing output
return out;
}, Materialized.with(stringSerde, outputSerde))
.suppress(Suppressed.untilTimeLimit(windowDuration,
Suppressed.BufferConfig.unbounded()))
.toStream()
.map((Windowed key, OutputPojo out) -> {
return new KeyValue<>(key.key(),out) ;
})
.print(Printed.toSysOut());
//.to(aveTempOutputTopic, Produced.with(stringSerde,
outputSerde))
;






Input data :

for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done
> {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
> '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
> {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
> 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
> {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
> '48d2b4ea-052d-42fa-a998-0216d928c034'}
> {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
> '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
> {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
> 'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
> {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
> '135dc5cd-50cb-467b-9e63-300fdeedaf75'}
> {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
> '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
> {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
> 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
> {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
> '7baa4254-b9da-43dc-bbb7-4caede578aeb'}
> {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
> '16541989-f3ba-49f6-bd31-bf8a75ba8eac'}





Output (*Unexpected*) :  below output is captured at each sliding window of
1s duration   (but input data is published at 2s of interval) :

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:31:28.263,
> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  > seems older UUID
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
> strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)
>
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
> strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)
>
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTim

[ANNOUNCE] Apache Kafka 3.2.0

2022-05-17 Thread Bruno Cadonna
The Apache Kafka community is pleased to announce the release for Apache 
Kafka 3.2.0


* log4j 1.x is replaced with reload4j (KAFKA-9366)
* StandardAuthorizer for KRaft (KIP-801)
* Send a hint to the partition leader to recover the partition (KIP-704)
* Top-level error code field in DescribeLogDirsResponse (KIP-784)
* kafka-console-producer writes headers and null values (KIP-798 and 
KIP-810)

* JoinGroupRequest and LeaveGroupRequest have a reason attached (KIP-800)
* Static membership protocol lets the leader skip assignment (KIP-814)
* Rack-aware standby task assignment in Kafka Streams (KIP-708)
* Interactive Query v2 (KIP-796, KIP-805, and KIP-806)
* Connect APIs list all connector plugins and retrieve their 
configuration (KIP-769)

* TimestampConverter SMT supports different unix time precisions (KIP-808)
* Connect source tasks handle producer exceptions (KIP-779)

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/3.2.0/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12 and 2.13) from:
https://kafka.apache.org/downloads#3.2.0

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream of records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 113 contributors to this release!

A. Sophie Blee-Goldman, Adam Kotwasinski, Aleksandr Sorokoumov, 
Alexandre Garnier, Alok Nikhil, aSemy, Bounkong Khamphousone, bozhao12, 
Bruno Cadonna, Chang, Chia-Ping Tsai, Chris Egerton, Colin P. Mccabe, 
Colin Patrick McCabe, Cong Ding, David Arthur, David Jacot, David Mao, 
defhacks, dengziming, Ed B, Edwin, florin-akermann, GauthamM-official, 
GuoPhilipse, Guozhang Wang, Hao Li, Haoze Wu, Idan Kamara, Ismael Juma, 
Jason Gustafson, Jason Koch, Jeff Kim, jiangyuan, Joel Hamill, John 
Roesler, Jonathan Albrecht, Jorge Esteban Quilcate Otoya, Josep Prat, 
Joseph (Ting-Chou) Lin, José Armando García Sancio, Jules Ivanic, Julien 
Chanaud, Justin Lee, Justine Olshan, Kamal Chandraprakash, Kate Stanley, 
keashem, Kirk True, Knowles Atchison, Jr, Konstantine Karantasis, 
Kowshik Prakasam, kurtostfeld, Kvicii, Lee Dongjin, Levani Kokhreidze, 
lhunyady, Liam Clarke-Hutchinson, liym, loboya~, Lucas Bradstreet, 
Ludovic DEHON, Luizfrf3, Luke Chen, Marc Löhe, Matthew Wong, Matthias J. 
Sax, Michal T, Mickael Maison, Mike Lothian, mkandaswamy, Márton 
Sigmond, Nick Telford, Niket, Okada Haruki, Paolo Patierno, Patrick 
Stuedi, Philip Nee, Prateek Agarwal, prince-mahajan, Rajini Sivaram, 
Randall Hauch, Richard, RivenSun, Rob Leland, Ron Dagostino, Sayantanu 
Dey, Stanislav Vodetskyi, sunshujie1990, Tamara Skokova, Tim Patterson, 
Tolga H. Dur, Tom Bentley, Tomonari Yamashita, vamossagar12, Vicky 
Papavasileiou, Victoria Xia, Vijay Krishna, Vincent Jiang, Walker 
Carlson, wangyap, Wenhao Ji, Wenjun Ruan, Xiaobing Fang, Xiaoyue Xue, 
xuexiaoyue, Yang Yu, yasar03, Yu, Zhang Hongyi, zzccctv, 工业废水, 彭小漪


We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!


Regards,

Bruno


Re: [ANNOUNCE] Apache Kafka 3.2.0

2022-05-17 Thread Bill Bejeck
Thanks for running the release Bruno!

On Tue, May 17, 2022 at 12:02 PM Bruno Cadonna  wrote:

> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 3.2.0
>
> * log4j 1.x is replaced with reload4j (KAFKA-9366)
> * StandardAuthorizer for KRaft (KIP-801)
> * Send a hint to the partition leader to recover the partition (KIP-704)
> * Top-level error code field in DescribeLogDirsResponse (KIP-784)
> * kafka-console-producer writes headers and null values (KIP-798 and
> KIP-810)
> * JoinGroupRequest and LeaveGroupRequest have a reason attached (KIP-800)
> * Static membership protocol lets the leader skip assignment (KIP-814)
> * Rack-aware standby task assignment in Kafka Streams (KIP-708)
> * Interactive Query v2 (KIP-796, KIP-805, and KIP-806)
> * Connect APIs list all connector plugins and retrieve their
> configuration (KIP-769)
> * TimestampConverter SMT supports different unix time precisions (KIP-808)
> * Connect source tasks handle producer exceptions (KIP-779)
>
> All of the changes in this release can be found in the release notes:
> https://www.apache.org/dist/kafka/3.2.0/RELEASE_NOTES.html
>
>
> You can download the source and binary release (Scala 2.12 and 2.13) from:
> https://kafka.apache.org/downloads#3.2.0
>
>
> ---
>
>
> Apache Kafka is a distributed streaming platform with four core APIs:
>
>
> ** The Producer API allows an application to publish a stream of records to
> one or more Kafka topics.
>
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
>
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output stream to one or more output topics, effectively transforming the
> input streams to output streams.
>
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might
> capture every change to a table.
>
>
> With these APIs, Kafka can be used for two broad classes of application:
>
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
>
> ** Building real-time streaming applications that transform or react
> to the streams of data.
>
>
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>
> A big thank you for the following 113 contributors to this release!
>
> A. Sophie Blee-Goldman, Adam Kotwasinski, Aleksandr Sorokoumov,
> Alexandre Garnier, Alok Nikhil, aSemy, Bounkong Khamphousone, bozhao12,
> Bruno Cadonna, Chang, Chia-Ping Tsai, Chris Egerton, Colin P. Mccabe,
> Colin Patrick McCabe, Cong Ding, David Arthur, David Jacot, David Mao,
> defhacks, dengziming, Ed B, Edwin, florin-akermann, GauthamM-official,
> GuoPhilipse, Guozhang Wang, Hao Li, Haoze Wu, Idan Kamara, Ismael Juma,
> Jason Gustafson, Jason Koch, Jeff Kim, jiangyuan, Joel Hamill, John
> Roesler, Jonathan Albrecht, Jorge Esteban Quilcate Otoya, Josep Prat,
> Joseph (Ting-Chou) Lin, José Armando García Sancio, Jules Ivanic, Julien
> Chanaud, Justin Lee, Justine Olshan, Kamal Chandraprakash, Kate Stanley,
> keashem, Kirk True, Knowles Atchison, Jr, Konstantine Karantasis,
> Kowshik Prakasam, kurtostfeld, Kvicii, Lee Dongjin, Levani Kokhreidze,
> lhunyady, Liam Clarke-Hutchinson, liym, loboya~, Lucas Bradstreet,
> Ludovic DEHON, Luizfrf3, Luke Chen, Marc Löhe, Matthew Wong, Matthias J.
> Sax, Michal T, Mickael Maison, Mike Lothian, mkandaswamy, Márton
> Sigmond, Nick Telford, Niket, Okada Haruki, Paolo Patierno, Patrick
> Stuedi, Philip Nee, Prateek Agarwal, prince-mahajan, Rajini Sivaram,
> Randall Hauch, Richard, RivenSun, Rob Leland, Ron Dagostino, Sayantanu
> Dey, Stanislav Vodetskyi, sunshujie1990, Tamara Skokova, Tim Patterson,
> Tolga H. Dur, Tom Bentley, Tomonari Yamashita, vamossagar12, Vicky
> Papavasileiou, Victoria Xia, Vijay Krishna, Vincent Jiang, Walker
> Carlson, wangyap, Wenhao Ji, Wenjun Ruan, Xiaobing Fang, Xiaoyue Xue,
> xuexiaoyue, Yang Yu, yasar03, Yu, Zhang Hongyi, zzccctv, 工业废水, 彭小漪
>
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> https://kafka.apache.org/
>
> Thank you!
>
>
> Regards,
>
> Bruno
>