AW: Switchin from Zookepper to Kafka KRaft mode / Using ACLs with Kafka KRaft mode
Hi Thomas, thanks for the quick answer. I got it running by declaring the CONTROLLER listerner also as SSL and using the existing keystore/trustore for it. Unfortunately I now recognize a new issue when restarting Kafka: https://issues.apache.org/jira/browse/KAFKA-13909 Hopefully this will also be solved. Awaiting using Kafka without ZooKeeper. Best regards, Florian -Ursprüngliche Nachricht- Von: Thomas Cooper [mailto:c...@tomcooper.dev] Gesendet: Montag, 16. Mai 2022 14:49 An: users@kafka.apache.org; Florian Blumenstein Betreff: Re: Switchin from Zookepper to Kafka KRaft mode / Using ACLs with Kafka KRaft mode CAUTION: This email originated from outside the organization. Do not click links or open attachments unless you recognize the sender and know the content is safe. Hi Florian, Switching from a Zookeeper based cluster to a KRaft based one is not currently supported. AFAIK that functionality should be coming in Kafka 3.4 (or possibly later). Cheers, Tom On 16/05/2022 12:42, Florian Blumenstein wrote: > Hi guys, > > I currently try to switch from Kafka 3.1.0 with ZooKeeper to Kafka 3.2.0 with > Kafka Kraft mode. I adjusted the server.properties as follows: > > ### KRaft-properties > process.roles=broker,controller > node.id=1 > controller.quorum.voters=1@127.0.0.1:9091 > controller.listener.names=CONTROLLER > > auto.create.topics.enable=false > ssl.client.auth=required > > ### Enable ACLs > authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAut > horizer > allow.everyone.if.no.acl.found=false > > # Topics and indexes are stored here to keep track of records sent via > broker log.dir=/opt/kafka/data/ > > # Internal Topic Settings > # # The replication factor for the group metadata > internal topics "__consumer_offsets" and "__transaction_state" > # For anything other than development testing, a value greater than 1 is > recommended for to ensure availability such as 3. > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > > ### Platform Configured Entries --- Below here entries are configured > by the platform > listener.name.docker.ssl.keystore.location=/app/ssl/internalKeystore.j > ks > super.users=User:Applications:0765df41-0b31-4db8-8849-c9d77e9c6e20;Use > r:CN=onlinesuiteplus-kafka,OU=Services,O=Company AG,L=City,C=DE > advertised.listeners=DEVIN://onlinesuiteplus-kafka:29092,DEVOUT://loca > lhost:9092,DOCKER://onlinesuiteplus-kafka:29093,EXTERNAL://localhost:9 > 093 listener.name.docker.ssl.key.password=password > inter.broker.listener.name=DOCKER > listener.name.external.ssl.key.password=password > listener.name.external.ssl.truststore.password=password > ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=Applications.*$/Applicat > ions:$1/,RULE:^CN=(.*?),OU=Devices.*$/Devices:$1/,DEFAULT > initial.start=true > listener.name.docker.ssl.truststore.location=/app/ssl/truststore.jks > listener.name.external.ssl.keystore.password=password > listeners=CONTROLLER://:9091,DEVIN://:29092,DEVOUT://:9092,DOCKER://:2 > 9093,EXTERNAL://:9093 > listener.name.external.ssl.truststore.location=/app/ssl/truststore.jks > listener.name.docker.ssl.truststore.password=password > listener.name.external.ssl.keystore.location=/app/ssl/externalKeystore > .jks > listener.security.protocol.map=CONTROLLER:PLAINTEXT,DEVIN:PLAINTEXT,DE > VOUT:PLAINTEXT,DOCKER:SSL,EXTERNAL:SSL > listener.name.docker.ssl.keystore.password=password > > If I now run kafka with the following script: > > if [ "$KAFKA_INITIAL_START" == "true" ] then > echo "Running kafka-storage.sh because env var KAFKA_INITIAL_START was > set to true" > "${KAFKA_HOME}"/bin/kafka-storage.sh format --config > "${KAFKA_HOME}"/config/server.properties --cluster-id > $("${KAFKA_HOME}"/bin/kafka-storage.sh random-uuid) fi > > exec "$KAFKA_HOME/bin/kafka-server-start.sh" > "$KAFKA_HOME/config/server.properties" > > > I got the following logs: > > [2022-05-16 11:25:08,894] INFO Registered > kafka:type=kafka.Log4jController MBean > (kafka.utils.Log4jControllerRegistration$) > [2022-05-16 11:25:09,220] INFO Setting -D > jdk.tls.rejectClientInitiatedRenegotiation=true to disable > client-initiated TLS renegotiation > (org.apache.zookeeper.common.X509Util) > [2022-05-16 11:25:09,473] INFO [LogLoader > partition=__cluster_metadata-0, dir=/opt/kafka/data] Loading producer > state till offset 0 with message format version 2 > (kafka.log.UnifiedLog$) > [2022-05-16 11:25:09,474] INFO [LogLoader > partition=__cluster_metadata-0, dir=/opt/kafka/data] Reloading from > producer snapshot and rebuilding producer state from offset 0 > (kafka.log.UnifiedLog$) > [2022-05-16 11:25:09,477] INFO [LogLoader > partition=__cluster_metadata-0, dir=/opt/kafka/data] Producer state > recovery took 2ms for snapshot load and 0ms for segment recovery from > offset 0 (kafka.log.UnifiedLog$) > [2022-05-16 11:25:09,584] INFO [raft-expirati
kafka stream - sliding window - getting unexpected output
Hi All, Our use case is to use sliding window. (for e.g. at any point, whenever > user performs any actions at time [ t1 ], we would like to see his activity > in [ t1 - last 24 hours]. Using this, to show the user some recommendations. -- I have code ready and it works without any errors. -- aggregations happen as expected. -- but the output generated is unexpected. As windows gets slides, i am getting mixed output which includes intermediate aggregated records also coming with final aggregated outputs. Could someone please help me here ? what can I do here to get ONLY final aggregated output. Code snippet : builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde)) .filter((k, v) -> v != null) .map((k,v) -> KeyValue.pair(v.getUserId(), v)) //.through("slidingbykey", Produced.with(Serdes.String(), inputSerde)) .groupByKey() .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), windowDuration)) .aggregate(OutputPojo::new, (k, tr, out) -> { out.setUserId(tr.getUserId()); out.setCount(out.getCount() +1); out.setSum(out.getSum() + tr.getInt4()); out.setUuid(tr.getUuid()); out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp())); waitForMs(200); //added delay just for analysing output return out; }, Materialized.with(stringSerde, outputSerde)) .suppress(Suppressed.untilTimeLimit(windowDuration, Suppressed.BufferConfig.unbounded())) .toStream() .map((Windowed key, OutputPojo out) -> { return new KeyValue<>(key.key(),out) ; }) .print(Printed.toSysOut()); //.to(aveTempOutputTopic, Produced.with(stringSerde, outputSerde)) ; Input data : for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done > {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid': > '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'} > {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid': > 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'} > {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid': > '48d2b4ea-052d-42fa-a998-0216d928c034'} > {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid': > '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'} > {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid': > 'dbfd8cee-565d-496b-b5a8-773ae64bc518'} > {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid': > '135dc5cd-50cb-467b-9e63-300fdeedaf75'} > {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid': > '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'} > {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid': > 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'} > {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid': > '7baa4254-b9da-43dc-bbb7-4caede578aeb'} > {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid': > '16541989-f3ba-49f6-bd31-bf8a75ba8eac'} Output (*Unexpected*) : below output is captured at each sliding window of 1s duration (but input data is published at 2s of interval) : [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, > strTime=2022-05-17 15:31:28.263, > uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34) > seems older UUID > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0, > strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34) > > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0, > strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1) > > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, > strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, > strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034) > > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, > strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, > strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe) > > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, > strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, > strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518) > > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, > strTim
[ANNOUNCE] Apache Kafka 3.2.0
The Apache Kafka community is pleased to announce the release for Apache Kafka 3.2.0 * log4j 1.x is replaced with reload4j (KAFKA-9366) * StandardAuthorizer for KRaft (KIP-801) * Send a hint to the partition leader to recover the partition (KIP-704) * Top-level error code field in DescribeLogDirsResponse (KIP-784) * kafka-console-producer writes headers and null values (KIP-798 and KIP-810) * JoinGroupRequest and LeaveGroupRequest have a reason attached (KIP-800) * Static membership protocol lets the leader skip assignment (KIP-814) * Rack-aware standby task assignment in Kafka Streams (KIP-708) * Interactive Query v2 (KIP-796, KIP-805, and KIP-806) * Connect APIs list all connector plugins and retrieve their configuration (KIP-769) * TimestampConverter SMT supports different unix time precisions (KIP-808) * Connect source tasks handle producer exceptions (KIP-779) All of the changes in this release can be found in the release notes: https://www.apache.org/dist/kafka/3.2.0/RELEASE_NOTES.html You can download the source and binary release (Scala 2.12 and 2.13) from: https://kafka.apache.org/downloads#3.2.0 --- Apache Kafka is a distributed streaming platform with four core APIs: ** The Producer API allows an application to publish a stream of records to one or more Kafka topics. ** The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them. ** The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams. ** The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table. With these APIs, Kafka can be used for two broad classes of application: ** Building real-time streaming data pipelines that reliably get data between systems or applications. ** Building real-time streaming applications that transform or react to the streams of data. Apache Kafka is in use at large and small companies worldwide, including Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank, Target, The New York Times, Uber, Yelp, and Zalando, among others. A big thank you for the following 113 contributors to this release! A. Sophie Blee-Goldman, Adam Kotwasinski, Aleksandr Sorokoumov, Alexandre Garnier, Alok Nikhil, aSemy, Bounkong Khamphousone, bozhao12, Bruno Cadonna, Chang, Chia-Ping Tsai, Chris Egerton, Colin P. Mccabe, Colin Patrick McCabe, Cong Ding, David Arthur, David Jacot, David Mao, defhacks, dengziming, Ed B, Edwin, florin-akermann, GauthamM-official, GuoPhilipse, Guozhang Wang, Hao Li, Haoze Wu, Idan Kamara, Ismael Juma, Jason Gustafson, Jason Koch, Jeff Kim, jiangyuan, Joel Hamill, John Roesler, Jonathan Albrecht, Jorge Esteban Quilcate Otoya, Josep Prat, Joseph (Ting-Chou) Lin, José Armando García Sancio, Jules Ivanic, Julien Chanaud, Justin Lee, Justine Olshan, Kamal Chandraprakash, Kate Stanley, keashem, Kirk True, Knowles Atchison, Jr, Konstantine Karantasis, Kowshik Prakasam, kurtostfeld, Kvicii, Lee Dongjin, Levani Kokhreidze, lhunyady, Liam Clarke-Hutchinson, liym, loboya~, Lucas Bradstreet, Ludovic DEHON, Luizfrf3, Luke Chen, Marc Löhe, Matthew Wong, Matthias J. Sax, Michal T, Mickael Maison, Mike Lothian, mkandaswamy, Márton Sigmond, Nick Telford, Niket, Okada Haruki, Paolo Patierno, Patrick Stuedi, Philip Nee, Prateek Agarwal, prince-mahajan, Rajini Sivaram, Randall Hauch, Richard, RivenSun, Rob Leland, Ron Dagostino, Sayantanu Dey, Stanislav Vodetskyi, sunshujie1990, Tamara Skokova, Tim Patterson, Tolga H. Dur, Tom Bentley, Tomonari Yamashita, vamossagar12, Vicky Papavasileiou, Victoria Xia, Vijay Krishna, Vincent Jiang, Walker Carlson, wangyap, Wenhao Ji, Wenjun Ruan, Xiaobing Fang, Xiaoyue Xue, xuexiaoyue, Yang Yu, yasar03, Yu, Zhang Hongyi, zzccctv, 工业废水, 彭小漪 We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at https://kafka.apache.org/ Thank you! Regards, Bruno
Re: [ANNOUNCE] Apache Kafka 3.2.0
Thanks for running the release Bruno! On Tue, May 17, 2022 at 12:02 PM Bruno Cadonna wrote: > The Apache Kafka community is pleased to announce the release for Apache > Kafka 3.2.0 > > * log4j 1.x is replaced with reload4j (KAFKA-9366) > * StandardAuthorizer for KRaft (KIP-801) > * Send a hint to the partition leader to recover the partition (KIP-704) > * Top-level error code field in DescribeLogDirsResponse (KIP-784) > * kafka-console-producer writes headers and null values (KIP-798 and > KIP-810) > * JoinGroupRequest and LeaveGroupRequest have a reason attached (KIP-800) > * Static membership protocol lets the leader skip assignment (KIP-814) > * Rack-aware standby task assignment in Kafka Streams (KIP-708) > * Interactive Query v2 (KIP-796, KIP-805, and KIP-806) > * Connect APIs list all connector plugins and retrieve their > configuration (KIP-769) > * TimestampConverter SMT supports different unix time precisions (KIP-808) > * Connect source tasks handle producer exceptions (KIP-779) > > All of the changes in this release can be found in the release notes: > https://www.apache.org/dist/kafka/3.2.0/RELEASE_NOTES.html > > > You can download the source and binary release (Scala 2.12 and 2.13) from: > https://kafka.apache.org/downloads#3.2.0 > > > --- > > > Apache Kafka is a distributed streaming platform with four core APIs: > > > ** The Producer API allows an application to publish a stream of records to > one or more Kafka topics. > > ** The Consumer API allows an application to subscribe to one or more > topics and process the stream of records produced to them. > > ** The Streams API allows an application to act as a stream processor, > consuming an input stream from one or more topics and producing an > output stream to one or more output topics, effectively transforming the > input streams to output streams. > > ** The Connector API allows building and running reusable producers or > consumers that connect Kafka topics to existing applications or data > systems. For example, a connector to a relational database might > capture every change to a table. > > > With these APIs, Kafka can be used for two broad classes of application: > > ** Building real-time streaming data pipelines that reliably get data > between systems or applications. > > ** Building real-time streaming applications that transform or react > to the streams of data. > > > Apache Kafka is in use at large and small companies worldwide, including > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank, > Target, The New York Times, Uber, Yelp, and Zalando, among others. > > A big thank you for the following 113 contributors to this release! > > A. Sophie Blee-Goldman, Adam Kotwasinski, Aleksandr Sorokoumov, > Alexandre Garnier, Alok Nikhil, aSemy, Bounkong Khamphousone, bozhao12, > Bruno Cadonna, Chang, Chia-Ping Tsai, Chris Egerton, Colin P. Mccabe, > Colin Patrick McCabe, Cong Ding, David Arthur, David Jacot, David Mao, > defhacks, dengziming, Ed B, Edwin, florin-akermann, GauthamM-official, > GuoPhilipse, Guozhang Wang, Hao Li, Haoze Wu, Idan Kamara, Ismael Juma, > Jason Gustafson, Jason Koch, Jeff Kim, jiangyuan, Joel Hamill, John > Roesler, Jonathan Albrecht, Jorge Esteban Quilcate Otoya, Josep Prat, > Joseph (Ting-Chou) Lin, José Armando García Sancio, Jules Ivanic, Julien > Chanaud, Justin Lee, Justine Olshan, Kamal Chandraprakash, Kate Stanley, > keashem, Kirk True, Knowles Atchison, Jr, Konstantine Karantasis, > Kowshik Prakasam, kurtostfeld, Kvicii, Lee Dongjin, Levani Kokhreidze, > lhunyady, Liam Clarke-Hutchinson, liym, loboya~, Lucas Bradstreet, > Ludovic DEHON, Luizfrf3, Luke Chen, Marc Löhe, Matthew Wong, Matthias J. > Sax, Michal T, Mickael Maison, Mike Lothian, mkandaswamy, Márton > Sigmond, Nick Telford, Niket, Okada Haruki, Paolo Patierno, Patrick > Stuedi, Philip Nee, Prateek Agarwal, prince-mahajan, Rajini Sivaram, > Randall Hauch, Richard, RivenSun, Rob Leland, Ron Dagostino, Sayantanu > Dey, Stanislav Vodetskyi, sunshujie1990, Tamara Skokova, Tim Patterson, > Tolga H. Dur, Tom Bentley, Tomonari Yamashita, vamossagar12, Vicky > Papavasileiou, Victoria Xia, Vijay Krishna, Vincent Jiang, Walker > Carlson, wangyap, Wenhao Ji, Wenjun Ruan, Xiaobing Fang, Xiaoyue Xue, > xuexiaoyue, Yang Yu, yasar03, Yu, Zhang Hongyi, zzccctv, 工业废水, 彭小漪 > > We welcome your help and feedback. For more information on how to > report problems, and to get involved, visit the project website at > https://kafka.apache.org/ > > Thank you! > > > Regards, > > Bruno >