[GitHub] [kafka] chia7712 commented on a change in pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers
chia7712 commented on a change in pull request #8759: URL: https://github.com/apache/kafka/pull/8759#discussion_r432919211 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -906,8 +906,8 @@ public void advanceWallClockTime(final Duration advance) { if (record == null) { throw new NoSuchElementException("Empty topic: " + topic); } -final K key = keyDeserializer.deserialize(record.topic(), record.key()); -final V value = valueDeserializer.deserialize(record.topic(), record.value()); +final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key()); Review comment: Nice finding! https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java#L805 has similar issue. Should we fix it as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8480) Clients may fetch incomplete set of topic partitions during cluster startup
[ https://issues.apache.org/jira/browse/KAFKA-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120460#comment-17120460 ] Debraj Manna commented on KAFKA-8480: - Does this affect latest Kafka 2.5.0 also? If yes then any estimate in which Kafka version we expect this issue to be fixed? > Clients may fetch incomplete set of topic partitions during cluster startup > --- > > Key: KAFKA-8480 > URL: https://issues.apache.org/jira/browse/KAFKA-8480 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: Anna Povzner >Assignee: Anna Povzner >Priority: Major > > KafkaConsumer#partitionsFor() or AdminClient#describeTopics() may return not > all partitions for a given topic when the cluster is starting up (after > cluster was down). > The cause is controller, on becoming a controller, sending > UpdateMetadataRequest for all partitions with at least one online replica, > and then a separate UpdateMetadataRequest for all partitions with at least > one offline replica. If client sends metadata request in between broker > processing those two update metadata requests, clients will get incomplete > set of partitions. > Proposed fix: controller should send one UpdateMetadataRequest (containing > all partitions) in ReplicaStateMachine#startup(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nikepakou opened a new pull request #8763: Ouweiqi
nikepakou opened a new pull request #8763: URL: https://github.com/apache/kafka/pull/8763 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nikepakou closed pull request #8763: Ouweiqi
nikepakou closed pull request #8763: URL: https://github.com/apache/kafka/pull/8763 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8760: Kafka-10064 Add documentation for KIP-571
feyman2016 commented on a change in pull request #8760: URL: https://github.com/apache/kafka/pull/8760#discussion_r432951034 ## File path: docs/streams/developer-guide/app-reset-tool.html ## @@ -56,7 +56,8 @@ Prerequisites -All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID application.id is still active by using bin/kafka-consumer-groups. +All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID application.id is still active by using bin/kafka-consumer-groups. +When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the --force option could remove those left-over members immediately. Review comment: @abbccdda Screenshots as below, please kindly check, thanks! https://user-images.githubusercontent.com/23044946/83354407-ff2a5480-a38a-11ea-9ccb-7b1df4b98210.png";> https://user-images.githubusercontent.com/23044946/83354324-862afd00-a38a-11ea-998d-b7cd268a55c0.png";> https://user-images.githubusercontent.com/23044946/83354434-2c770280-a38b-11ea-87b3-0e968b9dbee1.png";> ## File path: docs/streams/developer-guide/app-reset-tool.html ## @@ -56,7 +56,8 @@ Prerequisites -All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID application.id is still active by using bin/kafka-consumer-groups. +All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID application.id is still active by using bin/kafka-consumer-groups. +When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the --force option could remove those left-over members immediately. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on pull request #8760: Kafka-10064 Add documentation for KIP-571
feyman2016 commented on pull request #8760: URL: https://github.com/apache/kafka/pull/8760#issuecomment-636476681 @abbccdda Screenshots as below, please kindly check, thanks! https://user-images.githubusercontent.com/23044946/83354407-ff2a5480-a38a-11ea-9ccb-7b1df4b98210.png";> https://user-images.githubusercontent.com/23044946/83354324-862afd00-a38a-11ea-998d-b7cd268a55c0.png";> https://user-images.githubusercontent.com/23044946/83354434-2c770280-a38b-11ea-87b3-0e968b9dbee1.png";> This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8760: Kafka-10064 Add documentation for KIP-571
feyman2016 commented on a change in pull request #8760: URL: https://github.com/apache/kafka/pull/8760#discussion_r432951274 ## File path: docs/streams/developer-guide/app-reset-tool.html ## @@ -117,6 +118,9 @@ Step 1: Run the application reset tool
[GitHub] [kafka] feyman2016 edited a comment on pull request #8760: Kafka-10064 Add documentation for KIP-571
feyman2016 edited a comment on pull request #8760: URL: https://github.com/apache/kafka/pull/8760#issuecomment-636476681 @abbccdda Thanks for the review! Screenshots as below, please kindly check~ https://user-images.githubusercontent.com/23044946/83354407-ff2a5480-a38a-11ea-9ccb-7b1df4b98210.png";> https://user-images.githubusercontent.com/23044946/83354324-862afd00-a38a-11ea-998d-b7cd268a55c0.png";> https://user-images.githubusercontent.com/23044946/83354434-2c770280-a38b-11ea-87b3-0e968b9dbee1.png";> This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bellemare opened a new pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
bellemare opened a new pull request #8764: URL: https://github.com/apache/kafka/pull/8764 Bug Details: Mistakenly setting the value serde to the key serde for an internal wrapped serde in the FKJ workflow. Testing: Added integration test to use a non-primitive Serde, in this case the JSONSerde that the original bug finder reported using. Expanded integration test to ensure that the default Serdes work for the entire happy path of the FKJ. Introduces a testing dependency on com.fasterxml.jackson, though this is already the case in other modules so I suspect it won't be a big issue. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10063) UnsupportedOperation when querying cleaner metrics after shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120562#comment-17120562 ] Chia-Ping Tsai commented on KAFKA-10063: [~hachikuji] Could I take over this issue? > UnsupportedOperation when querying cleaner metrics after shutdown > - > > Key: KAFKA-10063 > URL: https://issues.apache.org/jira/browse/KAFKA-10063 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > We have a few log cleaner metrics which iterate the set of cleaners. For > example: > {code} > newGauge("max-clean-time-secs", () => > cleaners.iterator.map(_.lastStats.elapsedSecs).max.toInt) > {code} > It seems possible currently for LogCleaner metrics to get queried after > shutdown of the log cleaner, which clears the `cleaners` collection. This can > lead to the following error: > {code} > java.lang.UnsupportedOperationException: empty.max > at scala.collection.IterableOnceOps.max(IterableOnce.scala:952) > at scala.collection.IterableOnceOps.max$(IterableOnce.scala:950) > at scala.collection.AbstractIterator.max(Iterator.scala:1279) > at > kafka.log.LogCleaner.kafka$log$LogCleaner$$$anonfun$new$9(LogCleaner.scala:132) > at kafka.log.LogCleaner$$anonfun$4.value(LogCleaner.scala:132) > at kafka.log.LogCleaner$$anonfun$4.value(LogCleaner.scala:132) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception
[ https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120564#comment-17120564 ] Adam Bellemare commented on KAFKA-10049: Thanks [~amicngh] for finding and reporting this bug! > KTable-KTable Foreign Key join throwing Serialization Exception > > > Key: KAFKA-10049 > URL: https://issues.apache.org/jira/browse/KAFKA-10049 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0, 2.6.0 >Reporter: Amit Chauhan >Assignee: Adam Bellemare >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: 10049-bellemare.patch > > > I want to make use of _KTable-KTable_ Foreign Key join feature released in > *_2.5.0_* but facing issue while running the code. > {code:java} > > public static void main(String[] args) { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application-2"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new > JSONSerdeComp<>().getClass()); > props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp"); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > StreamsBuilder builder = new StreamsBuilder(); > KTable ordersTable = builder. OrderObject>table(TOPIC_Agora); > KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data); > KTable enriched = > ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new > ValueJoiner() { > @Override > public EnrichedOrder apply(OrderObject order, StockMarketData > stock) { > EnrichedOrder enOrder = EnrichedOrder.builder() > .orderId(order.getOrderId()) > .execPrice(order.getPrice()) > .symbol(order.getSymbol()) > .quanity(order.getQuanity()) > .side(order.getSide()) > .filledQty(order.getFilledQty()) > .leaveQty(order.getLeaveQty()) > .index(order.getIndex()) > .vWaprelative(order.getVWaprelative()) > > .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0) > > .stockBid(stock!=null?stock.getBid().doubleValue():0.0) > > .stockLast(stock!=null?stock.getLast().doubleValue():0.0) > > .stockClose(stock!=null?stock.getClose().doubleValue():0.0) > .build(); > return enOrder; > } > } , Materialized.with(Serdes.String(), new JSONSerdeComp<>())); > enriched.toStream().foreach(new ForeachAction() \{ > @Override > public void apply(String arg0, EnrichedOrder arg1) { > logger.info(String.format("key = %s, value = %s", arg0, arg1)); > } > }); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > streams.start(); > Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close())); > }}} > > > org.apache.kafka > kafka-clients > 2.5.0 > > > org.apache.kafka > kafka-streams > 2.5.0 > > {code} > *+Exception:+* > {code:java} > 18:49:31.525 > [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - > stream-thread > [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] > task [0_0] Failed to flush state store orders-STATE-STORE-00: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > while producing data to a sink topic. A serializer (key: > org.apache.kafka.common.serialization.StringSerializer / value: > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer) > is not compatible to the actual key or value type (key type: > java.lang.String / value type: > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). > Change the default Serdes in StreamConfig or provide correct Serdes via > method parameters (for example if using the DSL, `#to(String topic, > Produced produced)` with > `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`). > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) > ~[kafka-streams-2.5.0.jar:?] > at
[jira] [Commented] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception
[ https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120563#comment-17120563 ] Adam Bellemare commented on KAFKA-10049: PR is up. > KTable-KTable Foreign Key join throwing Serialization Exception > > > Key: KAFKA-10049 > URL: https://issues.apache.org/jira/browse/KAFKA-10049 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0, 2.6.0 >Reporter: Amit Chauhan >Assignee: Adam Bellemare >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: 10049-bellemare.patch > > > I want to make use of _KTable-KTable_ Foreign Key join feature released in > *_2.5.0_* but facing issue while running the code. > {code:java} > > public static void main(String[] args) { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application-2"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new > JSONSerdeComp<>().getClass()); > props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp"); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > StreamsBuilder builder = new StreamsBuilder(); > KTable ordersTable = builder. OrderObject>table(TOPIC_Agora); > KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data); > KTable enriched = > ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new > ValueJoiner() { > @Override > public EnrichedOrder apply(OrderObject order, StockMarketData > stock) { > EnrichedOrder enOrder = EnrichedOrder.builder() > .orderId(order.getOrderId()) > .execPrice(order.getPrice()) > .symbol(order.getSymbol()) > .quanity(order.getQuanity()) > .side(order.getSide()) > .filledQty(order.getFilledQty()) > .leaveQty(order.getLeaveQty()) > .index(order.getIndex()) > .vWaprelative(order.getVWaprelative()) > > .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0) > > .stockBid(stock!=null?stock.getBid().doubleValue():0.0) > > .stockLast(stock!=null?stock.getLast().doubleValue():0.0) > > .stockClose(stock!=null?stock.getClose().doubleValue():0.0) > .build(); > return enOrder; > } > } , Materialized.with(Serdes.String(), new JSONSerdeComp<>())); > enriched.toStream().foreach(new ForeachAction() \{ > @Override > public void apply(String arg0, EnrichedOrder arg1) { > logger.info(String.format("key = %s, value = %s", arg0, arg1)); > } > }); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > streams.start(); > Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close())); > }}} > > > org.apache.kafka > kafka-clients > 2.5.0 > > > org.apache.kafka > kafka-streams > 2.5.0 > > {code} > *+Exception:+* > {code:java} > 18:49:31.525 > [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - > stream-thread > [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] > task [0_0] Failed to flush state store orders-STATE-STORE-00: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > while producing data to a sink topic. A serializer (key: > org.apache.kafka.common.serialization.StringSerializer / value: > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer) > is not compatible to the actual key or value type (key type: > java.lang.String / value type: > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). > Change the default Serdes in StreamConfig or provide correct Serdes via > method parameters (for example if using the DSL, `#to(String topic, > Produced produced)` with > `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`). > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.intern
[jira] [Created] (KAFKA-10073) EndTxn wait until markers compelete
HaiyuanZhao created KAFKA-10073: --- Summary: EndTxn wait until markers compelete Key: KAFKA-10073 URL: https://issues.apache.org/jira/browse/KAFKA-10073 Project: Kafka Issue Type: Sub-task Reporter: HaiyuanZhao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10073) EndTxn wait until markers compelete
[ https://issues.apache.org/jira/browse/KAFKA-10073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao reassigned KAFKA-10073: --- Assignee: HaiyuanZhao > EndTxn wait until markers compelete > --- > > Key: KAFKA-10073 > URL: https://issues.apache.org/jira/browse/KAFKA-10073 > Project: Kafka > Issue Type: Sub-task >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] zhaohaidao opened a new pull request #8765: KAFKA-10073: endTxn wait until markers compelete
zhaohaidao opened a new pull request #8765: URL: https://github.com/apache/kafka/pull/8765 First change of KAFKA-9878: endTxn wait until markers compelete This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10060) Kafka is logging too verbosely at the INFO level
[ https://issues.apache.org/jira/browse/KAFKA-10060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120571#comment-17120571 ] Chia-Ping Tsai commented on KAFKA-10060: > I think the expired offsets one can be removed if there are no offsets to > expire. It seems a small change. Could I take over it? > Kafka is logging too verbosely at the INFO level > > > Key: KAFKA-10060 > URL: https://issues.apache.org/jira/browse/KAFKA-10060 > Project: Kafka > Issue Type: Bug > Components: logging >Affects Versions: 2.1.0 >Reporter: Greg Hamilton >Priority: Major > > Some of the INFO level log4j entries are quite verbose and not really useful, > for example in kafka.coordinator.group.GroupMetadataManager, the following > log can be constantly printed with 0 expired offsets: > > {code:java} > info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - > currentTimestamp} milliseconds."){code} > > > *Other examples include:* > kafka.coordinator.group.GroupMetadataManager.GroupCoordinator: > > {code:java} > info(s"Group ${group.groupId} with generation ${group.generationId} is now > empty " + > s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") > {code} > {code:java} > info(s"Preparing to rebalance group ${group.groupId} in state > ${group.currentState} with old generation " + s"${group.generationId} > (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: > $reason)") > {code} > {code:java} > info(s"Assignment received from leader for group ${group.groupId} for > generation ${group.generationId}") > {code} > {code:java} > info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + > s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") > {code} > > > We should move them to DEBUG if they are expected in normal state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10060) Kafka is logging too verbosely at the INFO level
[ https://issues.apache.org/jira/browse/KAFKA-10060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120574#comment-17120574 ] Ismael Juma commented on KAFKA-10060: - Sure! > Kafka is logging too verbosely at the INFO level > > > Key: KAFKA-10060 > URL: https://issues.apache.org/jira/browse/KAFKA-10060 > Project: Kafka > Issue Type: Bug > Components: logging >Affects Versions: 2.1.0 >Reporter: Greg Hamilton >Priority: Major > > Some of the INFO level log4j entries are quite verbose and not really useful, > for example in kafka.coordinator.group.GroupMetadataManager, the following > log can be constantly printed with 0 expired offsets: > > {code:java} > info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - > currentTimestamp} milliseconds."){code} > > > *Other examples include:* > kafka.coordinator.group.GroupMetadataManager.GroupCoordinator: > > {code:java} > info(s"Group ${group.groupId} with generation ${group.generationId} is now > empty " + > s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") > {code} > {code:java} > info(s"Preparing to rebalance group ${group.groupId} in state > ${group.currentState} with old generation " + s"${group.generationId} > (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: > $reason)") > {code} > {code:java} > info(s"Assignment received from leader for group ${group.groupId} for > generation ${group.generationId}") > {code} > {code:java} > info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + > s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") > {code} > > > We should move them to DEBUG if they are expected in normal state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
ijuma commented on pull request #8421: URL: https://github.com/apache/kafka/pull/8421#issuecomment-636487549 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
ijuma commented on pull request #8421: URL: https://github.com/apache/kafka/pull/8421#issuecomment-636487648 @d8tltanc are you planning to address the latest set of comments from @skaundinya15? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
ijuma commented on pull request #8421: URL: https://github.com/apache/kafka/pull/8421#issuecomment-636488043 Also, I think it would be good to tackle the producer and consumer at the same time to make sure we're building a reusable implementation instead of something specific to the AdminClient. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
ijuma commented on a change in pull request #8421: URL: https://github.com/apache/kafka/pull/8421#discussion_r432958968 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -683,21 +686,65 @@ public Node provide() { } } +/** + * Provides context which the retry may refer to + */ +class CallRetryContext { + +private int tries = 0; +private long nextAllowedTryMs = 0; +private final double JITTER_MIN = 0.8; +private final double JITTER_MAX = 1.2; + +public int tries() { +return tries; +} + +public long nextAllowedTryMs() { +return nextAllowedTryMs; +} + +private void updateTries(int currentTries) { +this.tries = currentTries + 1; +} + +private void updateNextAllowTryMs() { +double jitter = Math.random() * (JITTER_MAX - JITTER_MIN) + JITTER_MIN; +int failures = tries - 1; +double exp = Math.pow(2, failures); +this.nextAllowedTryMs = time.milliseconds() + +(long) Math.min(retryBackoffMaxMs, jitter * exp * retryBackoffMs); +// TODO: Remove the line below +System.out.println("nextAllow = " + (long) Math.min(retryBackoffMaxMs, jitter * exp * retryBackoffMs)); +} Review comment: Similar logic exists in `ClusterConnectionStates.updateReconnectBackoff`. Maybe we could extract it to a utilities class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #8766: MINOR: Update zstd to 1.4.5
ijuma opened a new pull request #8766: URL: https://github.com/apache/kafka/pull/8766 It improves decompression speed: >For x64 cpus, expect a speed bump of at least +5%, and up to +10% in favorable cases. >ARM cpus receive more benefit, with speed improvements ranging from +15% vicinity, >and up to +50% for certain SoCs and scenarios (ARM‘s situation is more complex due >to larger differences in SoC designs). See https://github.com/facebook/zstd/releases/tag/v1.4.5 for more details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8767: KAFKA-10060 GroupMetadataManager should not log if there are no offse…
chia7712 commented on a change in pull request #8767: URL: https://github.com/apache/kafka/pull/8767#discussion_r432961149 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ## @@ -85,21 +82,47 @@ class GroupMetadataManagerTest { offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + } -defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs - + private[this] def mockKafkaZkClient: KafkaZkClient = { // make two partitions of the group topic to make sure some partitions are not owned by the coordinator -zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) +val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) EasyMock.replay(zkClient) +zkClient + } + @Before + def setUp(): Unit = { +defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs metrics = new kMetrics() time = new MockTime replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) -groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time, metrics) +groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, + mockKafkaZkClient, time, metrics) partition = EasyMock.niceMock(classOf[Partition]) } + @Test + def testLogInfoFromCleanupGroupMetadata(): Unit = { +var expiredOffsets: Int = 0 +var infoCount = 0 +val gmm = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, mockKafkaZkClient, time, metrics) { Review comment: create another GroupMetadataManager to override the methods we want to verify This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10060) Kafka is logging too verbosely at the INFO level
[ https://issues.apache.org/jira/browse/KAFKA-10060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120582#comment-17120582 ] Chia-Ping Tsai commented on KAFKA-10060: https://github.com/apache/kafka/pull/8767 > Kafka is logging too verbosely at the INFO level > > > Key: KAFKA-10060 > URL: https://issues.apache.org/jira/browse/KAFKA-10060 > Project: Kafka > Issue Type: Bug > Components: logging >Affects Versions: 2.1.0 >Reporter: Greg Hamilton >Priority: Major > > Some of the INFO level log4j entries are quite verbose and not really useful, > for example in kafka.coordinator.group.GroupMetadataManager, the following > log can be constantly printed with 0 expired offsets: > > {code:java} > info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - > currentTimestamp} milliseconds."){code} > > > *Other examples include:* > kafka.coordinator.group.GroupMetadataManager.GroupCoordinator: > > {code:java} > info(s"Group ${group.groupId} with generation ${group.generationId} is now > empty " + > s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") > {code} > {code:java} > info(s"Preparing to rebalance group ${group.groupId} in state > ${group.currentState} with old generation " + s"${group.generationId} > (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: > $reason)") > {code} > {code:java} > info(s"Assignment received from leader for group ${group.groupId} for > generation ${group.generationId}") > {code} > {code:java} > info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + > s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") > {code} > > > We should move them to DEBUG if they are expected in normal state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 opened a new pull request #8767: KAFKA-10060 GroupMetadataManager should not log if there are no offse…
chia7712 opened a new pull request #8767: URL: https://github.com/apache/kafka/pull/8767 address @ijuma [comment](https://issues.apache.org/jira/browse/KAFKA-10060?focusedCommentId=17118992&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17118992) > I think the expired offsets one can be removed if there are no offsets to expire. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8767: KAFKA-10060 GroupMetadataManager should not log if there are no offse…
ijuma commented on a change in pull request #8767: URL: https://github.com/apache/kafka/pull/8767#discussion_r432961140 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -786,7 +786,9 @@ class GroupMetadataManager(brokerId: Int, group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs) }) offsetExpiredSensor.record(numOffsetsRemoved) -info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.") +if (numOffsetsRemoved > 0) { + info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.") +} Review comment: Nit: no need for braces. ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ## @@ -85,21 +82,47 @@ class GroupMetadataManagerTest { offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + } -defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs - + private[this] def mockKafkaZkClient: KafkaZkClient = { Review comment: Nit: we don't use `private[this]` for tests. It's mostly a micro-optimization is not particularly useful for tests (`private` provides good enough semantics). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8767: KAFKA-10060 GroupMetadataManager should not log if there are no offse…
chia7712 commented on a change in pull request #8767: URL: https://github.com/apache/kafka/pull/8767#discussion_r432963158 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -786,7 +786,9 @@ class GroupMetadataManager(brokerId: Int, group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs) }) offsetExpiredSensor.record(numOffsetsRemoved) -info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.") +if (numOffsetsRemoved > 0) { + info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.") +} Review comment: done ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ## @@ -85,21 +82,47 @@ class GroupMetadataManagerTest { offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + } -defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs - + private[this] def mockKafkaZkClient: KafkaZkClient = { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9494) Include data type of the config in ConfigEntry
[ https://issues.apache.org/jira/browse/KAFKA-9494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120634#comment-17120634 ] Travis Bischel commented on KAFKA-9494: --- It may be beneficial to update the KIP to avoid mentioning IncludeType in the request, since that is no longer a thing. > Include data type of the config in ConfigEntry > -- > > Key: KAFKA-9494 > URL: https://issues.apache.org/jira/browse/KAFKA-9494 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Shailesh Panwar >Priority: Minor > Fix For: 2.6.0 > > > Why this request? > To provide better validation. Including the data type can significantly > improve the validation on client side (be it web or cli or any other client). > In the absence of `type` the only way to know if the user specified value is > correct is to make an `alter` call and check if there is no error. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient
stanislavkozlovski commented on a change in pull request #8724: URL: https://github.com/apache/kafka/pull/8724#discussion_r432986275 ## File path: core/src/main/scala/kafka/controller/ControllerContext.scala ## @@ -391,6 +404,54 @@ class ControllerContext { partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet } + def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = { +val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) +val replicaAssignment = partitionFullReplicaAssignment(partition) +updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous, + Some(replicaAssignment), Some(leaderIsrAndControllerEpoch)) + } + + private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition, +oldReplicaAssignment: Option[ReplicaAssignment], +oldLeadershipInfo: Option[LeaderIsrAndControllerEpoch], +newReplicaAssignment: Option[ReplicaAssignment], Review comment: nit: I think this exact argument doesn't need to be an Option This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #8605: MINOR: align the constructor of KafkaConsumer to KafkaProducer
ijuma merged pull request #8605: URL: https://github.com/apache/kafka/pull/8605 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r432987394 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -354,15 +358,16 @@ def start_cmd(self, node): def start_node(self, node, timeout_sec=60): node.account.mkdirs(KafkaService.PERSISTENT_ROOT) + +self.security_config.setup_node(node) +self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(), broker=True) Review comment: Why do we need this change? ## File path: tests/kafkatest/services/security/security_config.py ## @@ -259,6 +267,9 @@ def setup_node(self, node): if self.has_sasl: self.setup_sasl(node) +if java_version(node) <= 9 and self.properties['tls.version'] == 'TLSv1.3': Review comment: For consistency, shall we use `11` here? ## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ## @@ -580,7 +581,16 @@ public void testTLSDefaults() throws Exception { @Test public void testUnsupportedCipher() throws Exception { -String[] cipherSuites = ((SSLServerSocketFactory) SSLServerSocketFactory.getDefault()).getSupportedCipherSuites(); +String[] cipherSuites; +if (Java.IS_JAVA11_COMPATIBLE) { +cipherSuites = new String[] { +"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", Review comment: What is the reason for this? ## File path: tests/kafkatest/tests/core/replication_test.py ## @@ -126,9 +126,11 @@ def min_cluster_size(self): security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512") @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"]) +@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], +security_protocol=["SSL"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"]) Review comment: I think we can change one of the `SASL_SSL` entries to include multiple TLS versions. This way, we also verify that it works for SASL_SSL. ## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ## @@ -622,6 +632,108 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } +/** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ +@Test +public void testCiphersSuiteForTls12_FailsForTls13() throws Exception { Review comment: We normally don't have `_` in method names. Can we remove it from here and other test methods? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r432987691 ## File path: tests/kafkatest/tests/core/replication_test.py ## @@ -126,9 +126,11 @@ def min_cluster_size(self): security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512") @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"]) +@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], +security_protocol=["SSL"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"]) Review comment: I think we can change one of the `SASL_SSL` entries to include multiple TLS versions. This way, we also verify that it works for SASL_SSL without increasing test time too much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9148) Consider forking RocksDB for Streams
[ https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120645#comment-17120645 ] adam Retter commented on KAFKA-9148: How are you guys finding the newer RocksDB versions these days? > Consider forking RocksDB for Streams > - > > Key: KAFKA-9148 > URL: https://issues.apache.org/jira/browse/KAFKA-9148 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > We recently upgraded our RocksDB dependency to 5.18 for its memory-management > abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, > someone from Flink recently discovered a ~8% [performance > regression|https://github.com/facebook/rocksdb/issues/5774] that exists in > all versions 5.18+ (up through the current newest version, 6.2.2). Flink was > able to react to this by downgrading to 5.17 and [picking the > WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their > fork (fRocksDB). > Due to this and other reasons enumerated below, we should consider also > forking our own RocksDB for Streams. > Pros: > * We can avoid passing sudden breaking changes on to our users, such removal > of methods with no deprecation period (see discussion on KAFKA-8897) > * We can pick whichever version has the best performance for our needs, and > pick over any new features, metrics, etc that we need to use rather than > being forced to upgrade (and breaking user code, introducing regression, etc) > * Support for some architectures does not exist in all RocksDB versions, > making Streams completely unusable for some users until we can upgrade the > rocksdb dependency to one that supports their specific case. It's worth > noting that we've only had [one > user|https://issues.apache.org/jira/browse/KAFKA-9225] hit this so far (that > we know of), and some workarounds have been discussed on the ticket. > * The Java API seems to be a very low priority to the rocksdb folks. > ** They leave out critical functionality, features, and configuration > options that have been in the c++ API for a very long time > ** Those that do make it over often have random gaps in the API such as > setters but no getters (see [rocksdb PR > #5186|https://github.com/facebook/rocksdb/pull/5186]) > ** Others are poorly designed and require too many trips across the JNI, > making otherwise incredibly useful features prohibitively expensive. > *** [|#issuecomment-83145980] [Custom > Comparator|https://github.com/facebook/rocksdb/issues/538#issuecomment-83145980]: > a custom comparator could significantly improve the performance of session > windows. This is trivial to do but given the high performance cost of > crossing the jni, it is currently only practical to use a c++ comparator > *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not > currently used by Streams but a commonly requested feature, and may also > allow improved range queries > ** Even when an external contributor develops a solution for poorly > performing Java functionality and helpfully tries to contribute their patch > back to rocksdb, it gets ignored by the rocksdb people ([rocksdb PR > #2283|https://github.com/facebook/rocksdb/pull/2283]) > Cons: > * More work (not to be trivialized, the truth is we don't and can't know how > much extra work this will ultimately be) > Given that we rarely upgrade the Rocks dependency, use only some fraction of > its features, and would need or want to make only minimal changes ourselves, > it seems like we could actually get away with very little extra work by > forking rocksdb. Note that as of this writing the frocksdb repo has only > needed to open 5 PRs on top of the actual rocksdb (two of them trivial). Of > course, the LOE to maintain this will only grow over time, so we should think > carefully about whether and when to start taking on this potential burden. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on pull request #8695: URL: https://github.com/apache/kafka/pull/8695#issuecomment-636536255 @nizhikov Have you had a chance to run the system tests with Java 11? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #8766: MINOR: Update zstd to 1.4.5
ijuma merged pull request #8766: URL: https://github.com/apache/kafka/pull/8766 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8767: KAFKA-10060 GroupMetadataManager should not log if there are no offse…
ijuma commented on pull request #8767: URL: https://github.com/apache/kafka/pull/8767#issuecomment-636536464 Unrelated flaky test failure: `kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #8767: KAFKA-10060 GroupMetadataManager should not log if there are no offse…
ijuma merged pull request #8767: URL: https://github.com/apache/kafka/pull/8767 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108705#comment-17108705 ] Travis Bischel edited comment on KAFKA-9987 at 5/31/20, 11:19 PM: -- For context, here's my current benchmarks (WithExisting mirrors an existing cluster rejoining, Imbalanced means unequal subscriptions): {noformat} BenchmarkLarge BenchmarkLarge: sticky_test.go:1272: 24104 total partitions; 100 total members BenchmarkLarge: sticky_test.go:1272: 24104 total partitions; 100 total members BenchmarkLarge-12100 11918236 ns/op 7121221 B/op 9563 allocs/op BenchmarkLargeWithExisting BenchmarkLargeWithExisting: sticky_test.go:1272: 24104 total partitions; 100 total members BenchmarkLargeWithExisting: sticky_test.go:1272: 24104 total partitions; 100 total members BenchmarkLargeWithExisting: sticky_test.go:1272: 24104 total partitions; 100 total members BenchmarkLargeWithExisting-12 74 16180851 ns/op 9605267 B/op 34015 allocs/op BenchmarkLargeImbalanced BenchmarkLargeImbalanced: sticky_test.go:1272: 24104 total partitions; 101 total members BenchmarkLargeImbalanced: sticky_test.go:1272: 24104 total partitions; 101 total members BenchmarkLargeImbalanced-12 68 17798614 ns/op17025139 B/op 9995 allocs/op BenchmarkLargeWithExistingImbalanced BenchmarkLargeWithExistingImbalanced: sticky_test.go:1272: 24104 total partitions; 101 total members BenchmarkLargeWithExistingImbalanced: sticky_test.go:1272: 24104 total partitions; 101 total members BenchmarkLargeWithExistingImbalanced-12 74 15852596 ns/op 9602434 B/op 33806 allocs/op {noformat} Switching up some numbers to better mirror this issue's problem statement: {noformat} BenchmarkLarge BenchmarkLarge: sticky_test.go:1272: 4274 total partitions; 2100 total members BenchmarkLarge: sticky_test.go:1272: 4274 total partitions; 2100 total members BenchmarkLarge: sticky_test.go:1272: 4274 total partitions; 2100 total members BenchmarkLarge-12 3 447516434 ns/op13942640 B/op 10619 allocs/op BenchmarkLargeWithExisting BenchmarkLargeWithExisting: sticky_test.go:1272: 4274 total partitions; 2100 total members BenchmarkLargeWithExisting: sticky_test.go:1272: 4274 total partitions; 2100 total members BenchmarkLargeWithExisting: sticky_test.go:1272: 4274 total partitions; 2100 total members BenchmarkLargeWithExisting-12 3 460263266 ns/op14482474 B/op 27700 allocs/op BenchmarkLargeImbalanced BenchmarkLargeImbalanced: sticky_test.go:1272: 4274 total partitions; 2101 total members BenchmarkLargeImbalanced: sticky_test.go:1272: 4274 total partitions; 2101 total members BenchmarkLargeImbalanced: sticky_test.go:1272: 4274 total partitions; 2101 total members BenchmarkLargeImbalanced-123 487361276 ns/op50107610 B/op 10636 allocs/op BenchmarkLargeWithExistingImbalanced BenchmarkLargeWithExistingImbalanced: sticky_test.go:1272: 4274 total partitions; 2101 total members BenchmarkLargeWithExistingImbalanced: sticky_test.go:1272: 4274 total partitions; 2101 total members BenchmarkLargeWithExistingImbalanced: sticky_test.go:1272: 4274 total partitions; 2101 total members BenchmarkLargeWithExistingImbalanced-123 459259448 ns/op14482096 B/op 27695 allocs/op {noformat} More extreme: {noformat} BenchmarkLarge BenchmarkLarge: sticky_test.go:1272: 1276057 total partitions; 1000 total members BenchmarkLarge-12 11889004419 ns/op430359568 B/op829830 allocs/op BenchmarkLargeWithExisting BenchmarkLargeWithExisting: sticky_test.go:1272: 1276057 total partitions; 1000 total members BenchmarkLargeWithExisting-12 13086791088 ns/op617969240 B/op 2516550 allocs/op BenchmarkLargeImbalanced BenchmarkLargeImbalanced: sticky_test.go:1272: 1276057 total partitions; 1001 total members tBenchmarkLargeImbalanced-12 132948262382 ns/op 5543028064 B/op 830336 allocs/op BenchmarkLargeWithExistingImbalanced BenchmarkLargeWithExistingImbalanced: sticky_test.go:1272: 1276057 total partitions; 1001 total members BenchmarkLargeWithExistingImbalanced-1215206902130 ns/op617954512 B/op 2515084 allocs/op {noformat} Note that the prior case uses quite a bit of RAM (~5-6G), but it also is balancing quite a lot of partitions among quite a lot of members; the actual planning itself only took ~0.5G, setup was the expen
[GitHub] [kafka] apovzner opened a new pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
apovzner opened a new pull request #8768: URL: https://github.com/apache/kafka/pull/8768 This PR implements the part of KIP-612 that adds broker configurations for broker-wide and per-listener connection creation rate limits and enforces these limits. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120656#comment-17120656 ] Hai Lin commented on KAFKA-9987: Thanks [~ableegoldman], will take a look at the PR. You have any time frame of next 2.4 release? > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10067) Provide API to detect cluster nodes version
[ https://issues.apache.org/jira/browse/KAFKA-10067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120657#comment-17120657 ] Yanming Zhou commented on KAFKA-10067: -- I'm talking about release version not api version, WebUI/GUI admin client want to know which version is running. > Provide API to detect cluster nodes version > --- > > Key: KAFKA-10067 > URL: https://issues.apache.org/jira/browse/KAFKA-10067 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Yanming Zhou >Priority: Major > > {code:java} > try (AdminClient ac = AdminClient.create(conf)) { > for (Node node : ac.describeCluster().nodes().get()) { > System.out.println(node.host()); > System.out.println(node.version()); // missing > feature > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10074) Improve performance of `matchingAcls`
Ismael Juma created KAFKA-10074: --- Summary: Improve performance of `matchingAcls` Key: KAFKA-10074 URL: https://issues.apache.org/jira/browse/KAFKA-10074 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 2.6.0 A recent change in `matchingAcls` to remove `filterKeys` in favor of filtering inside `flatMap` caused a performance regression in cases where there are large number of topics, prefix ACLs and TreeMap.from/to filtering is ineffective. In such cases, we rely on string comparisons to exclude entries from the ACL cache that are not relevant. This issue is not present in any release yet, so we should include the simple fix in the 2.6 branch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10074) Improve performance of `matchingAcls`
[ https://issues.apache.org/jira/browse/KAFKA-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120660#comment-17120660 ] Ismael Juma commented on KAFKA-10074: - cc [~rhauch] > Improve performance of `matchingAcls` > - > > Key: KAFKA-10074 > URL: https://issues.apache.org/jira/browse/KAFKA-10074 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Critical > Fix For: 2.6.0 > > > A recent change in `matchingAcls` to remove `filterKeys` in favor of > filtering inside `flatMap` caused a performance regression in cases where > there are large number of topics, prefix ACLs and TreeMap.from/to filtering > is ineffective. In such cases, we rely on string comparisons to exclude > entries from the ACL cache that are not relevant. > This issue is not present in any release yet, so we should include the simple > fix in the 2.6 branch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #8769: KAFKA-10074: Improve performance of `matchingAcls`
ijuma opened a new pull request #8769: URL: https://github.com/apache/kafka/pull/8769 This PR reduces allocations by using a plain old `foreach` in `matchingAcls` and improves `AclSeqs.find` to only search the inner collections that are required to find a match (instead of searching all of them). A recent change (90bbeedf52) in `matchingAcls` to remove `filterKeys` in favor of filtering inside `flatMap` caused a performance regression in cases where there are large number of topics, prefix ACLs and TreeMap.from/to filtering is ineffective. In such cases, we rely on string comparisons to exclude entries from the ACL cache that are not relevant. This issue is not present in any release yet, so we should include the simple fix in the 2.6 branch. The original benchmark did not show a performance difference, so I adjusted the benchmark to stress the relevant code more. More specifically, `aclCacheSnapshot.from(...).to(...)` returns nearly 2 entries where each map value contains 1000 AclEntries. Out of the 200k AclEntries, only 1050 are retained due to the `startsWith` filtering. This is the case where the implementation in master is least efficient when compared to the previous version and the version in this PR. The adjusted benchmark results for testAuthorizer are 4.532ms for master, 2.903ms for the previous version and 2.877ms for this PR. Normalized allocation rate was 593 KB/op for master, 597 KB/op for the previous version and 101 KB/s for this PR. Full results follow: master with adjusted benchmark: Benchmark (aclCount) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 20 avgt5680.805 ± 44.318 ms/op AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate 50 20 avgt5549.879 ± 36.259 MB/sec AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate.norm 50 20 avgt5 411457042.000 ± 4805.461B/op AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space 50 20 avgt5331.110 ± 95.821 MB/sec AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space.norm 50 20 avgt5 247799480.320 ± 72877192.319B/op AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space 50 20 avgt5 0.891 ±3.183 MB/sec AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space.norm 50 20 avgt5 667593.387 ± 2369888.357B/op AclAuthorizerBenchmark.testAclsIterator:·gc.count 50 20 avgt5 28.000 counts AclAuthorizerBenchmark.testAclsIterator:·gc.time 50 20 avgt5 3458.000 ms AclAuthorizerBenchmark.testAuthorizer 50 20 avgt5 4.532 ±0.546 ms/op AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate 50 20 avgt5119.036 ± 14.261 MB/sec AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate.norm 50 20 avgt5 593524.310 ± 22.452B/op AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Eden_Space 50 20 avgt5117.091 ± 1008.188 MB/sec AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Eden_Space.norm 50 20 avgt5 598574.303 ± 5153905.271B/op AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Survivor_Space 50 20 avgt5 0.034 ±0.291 MB/sec AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Survivor_Space.norm 50 20 avgt5173.001 ± 1489.593B/op AclAuthorizerBenchmark.testAuthorizer:·gc.count 50 20 avgt5 1.000 counts AclAuthorizerBenchmark.testAuthorizer:·gc.time 50 20 avgt5 13.000 ms master with filterKeys like 90bbeedf52 and adjusted benchmark: Benchmark (aclCount) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 20 avgt5729.163 ± 20.842 ms/op AclAuthorizerBenchmark.testAclsIterat
[GitHub] [kafka] feyman2016 commented on pull request #8760: Kafka-10064 Add documentation for KIP-571
feyman2016 commented on pull request #8760: URL: https://github.com/apache/kafka/pull/8760#issuecomment-636584556 @abbccdda Thanks a lot for the review! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm
jiameixie commented on pull request #8489: URL: https://github.com/apache/kafka/pull/8489#issuecomment-636587134 @cmccabe What tests should be done? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AshishRoyJava commented on pull request #8633: KAFKA-9965: Incrementing counter cause uneven distribution with RoundRobinPartitioner
AshishRoyJava commented on pull request #8633: URL: https://github.com/apache/kafka/pull/8633#issuecomment-636624709 Any update here?? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers
mjsax commented on a change in pull request #8759: URL: https://github.com/apache/kafka/pull/8759#discussion_r433054885 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -906,8 +906,8 @@ public void advanceWallClockTime(final Duration advance) { if (record == null) { throw new NoSuchElementException("Empty topic: " + topic); } -final K key = keyDeserializer.deserialize(record.topic(), record.key()); -final V value = valueDeserializer.deserialize(record.topic(), record.value()); +final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key()); Review comment: `readOutput` is deprecated. Thus not sure if it's worth to fix? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers
mjsax commented on a change in pull request #8759: URL: https://github.com/apache/kafka/pull/8759#discussion_r433055097 ## File path: streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java ## @@ -711,6 +715,56 @@ public void shouldUseSourceSpecificDeserializers() { assertThat(result2.getValue(), equalTo(source2Value)); } +@Test +public void shouldPassRecordHeadersIntoDeSerializers() { Review comment: The capital `S` was on purpose; it's short for `IntoDeserializersAndSerializers` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers
mjsax commented on a change in pull request #8759: URL: https://github.com/apache/kafka/pull/8759#discussion_r433055994 ## File path: streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java ## @@ -711,6 +715,56 @@ public void shouldUseSourceSpecificDeserializers() { assertThat(result2.getValue(), equalTo(source2Value)); } +@Test +public void shouldPassRecordHeadersIntoDeSerializers() { Review comment: We don't verify that header are deserialized -- we just verify that the right function is called, that hands the header into the deserializer (ie, to allow used to read them as metadata). I guess we should use mocks for the deserializers, but not for the serializers. Thus, not using mocks make the test use the same patter for both what seems "nicer". > Moreover, we could build a customized boolean deserializer which outputs true when the header is provided, or false otherwise. Would this make the test simpler? I personally doubt it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9494) Include data type of the config in ConfigEntry
[ https://issues.apache.org/jira/browse/KAFKA-9494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120782#comment-17120782 ] Shailesh Panwar commented on KAFKA-9494: Updated > Include data type of the config in ConfigEntry > -- > > Key: KAFKA-9494 > URL: https://issues.apache.org/jira/browse/KAFKA-9494 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Shailesh Panwar >Priority: Minor > Fix For: 2.6.0 > > > Why this request? > To provide better validation. Including the data type can significantly > improve the validation on client side (be it web or cli or any other client). > In the absence of `type` the only way to know if the user specified value is > correct is to make an `alter` call and check if there is no error. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)