[GitHub] [kafka] jlprat opened a new pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat opened a new pull request #10784: URL: https://github.com/apache/kafka/pull/10784 Updates the scala fmt to the latest stable version Applies all the style fixes (all source code changes are done by scala fmt) Removes setting about dangling parentheses as `true` is already the default ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat closed pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat closed pull request #10784: URL: https://github.com/apache/kafka/pull/10784 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat commented on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-891795576 I just closed and opened the PR to re-trigger Jenkins build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley merged pull request #10074: KAFKA-12305: Fix Flatten SMT for array types
tombentley merged pull request #10074: URL: https://github.com/apache/kafka/pull/10074 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #10074: KAFKA-12305: Fix Flatten SMT for array types
tombentley commented on pull request #10074: URL: https://github.com/apache/kafka/pull/10074#issuecomment-891822887 @C0urante sorry for the delay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #11165: MINOR Enable transaction system tests for KRaft in 3.0
mumrah opened a new pull request #11165: URL: https://github.com/apache/kafka/pull/11165 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11165: MINOR Enable transaction system tests for KRaft in 3.0
mumrah commented on pull request #11165: URL: https://github.com/apache/kafka/pull/11165#issuecomment-891852497 http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-07-30--001.system-test-kafka-branch-builder--1627678032--mumrah--3.0-reenable-transaction-system-test--e3e41966f/report.html ``` SESSION REPORT (ALL TESTS) ducktape version: 0.8.8 session_id: 2021-07-30--001 run time: 63 minutes 52.509 seconds tests run:96 passed: 96 failed: 0 ignored: 0 test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=False.use_group_metadata=False.metadata_quorum=REMOTE_KRAFT status: PASS run time: 1 minute 46.003 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=False.use_group_metadata=False.metadata_quorum=ZK status: PASS run time: 1 minute 47.281 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=False.use_group_metadata=True.metadata_quorum=REMOTE_KRAFT status: PASS run time: 2 minutes 1.223 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=True.use_group_metadata=False.metadata_quorum=REMOTE_KRAFT status: PASS run time: 1 minute 41.791 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=False.use_group_metadata=True.metadata_quorum=ZK status: PASS run time: 1 minute 44.978 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=True.use_group_metadata=False.metadata_quorum=ZK status: PASS run time: 1 minute 34.124 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=clients.check_order=False.use_group_metadata=False.metadata_quorum=REMOTE_KRAFT status: PASS run time: 1 minute 33.833 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=True.use_group_metadata=True.metadata_quorum=ZK status: PASS run time: 1 minute 38.241 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers.check_order=True.use_group_metadata=True.metadata_quorum=REMOTE_KRAFT status: PASS run time: 1 minute 40.300 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=clients.check_order=False.use_group_metadata=True.metadata_quorum=REMOTE_KRAFT status: PASS run time: 1 minute 38.104 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=clients.check_order=False.use_group_metadata=False.metadata_quorum=ZK status: PASS run time: 1 minute 41.586 seconds test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=clients.check_order=Fal
[GitHub] [kafka] rondagostino commented on pull request #11165: MINOR Enable transaction system tests for KRaft in 3.0
rondagostino commented on pull request #11165: URL: https://github.com/apache/kafka/pull/11165#issuecomment-891908369 There are other tests we disabled when we didn't have transaction/idempotent producer support in KRaft. Do we want to enable them here? In `replication_test.py`, the first test at the top, where `enable_idempotence` is True: ``` @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT"], enable_idempotence=[True]) ``` In `streams_smoke_test.py`, the exactly-once tests: ``` @matrix(processing_guarantee=['exactly_once', 'exactly_once_v2'], crash=[True, False]) ``` In `produce_bench_test.py`, the transaction test at the end: ``` @cluster(num_nodes=8) def test_produce_bench_transactions(self, metadata_quorum=quorum.zk): ``` In `group_mode_transactions_test.py`: ``` @cluster(num_nodes=10) @matrix(failure_mode=["hard_bounce", "clean_bounce"], bounce_target=["brokers", "clients"]) def test_transactions(self, failure_mode, bounce_target, metadata_quorum=quorum.zk): ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12789) Remove Stale comments for meta response handling logic
[ https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao resolved KAFKA-12789. - Resolution: Fixed > Remove Stale comments for meta response handling logic > -- > > Key: KAFKA-12789 > URL: https://issues.apache.org/jira/browse/KAFKA-12789 > Project: Kafka > Issue Type: Improvement >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the following paragraph looks like a stale > comments. > {code:java} > public void handleSuccessfulResponse(RequestHeader requestHeader, long now, > MetadataResponse response) { > ... > // Don't update the cluster if there are no valid nodes...the > topic we want may still be in the process of being > // created which means we will get errors and no nodes until it > exists > if (response.brokers().isEmpty()) { > log.trace("Ignoring empty metadata response with correlation > id {}.", requestHeader.correlationId()); > this.metadata.failedUpdate(now); > } else { > this.metadata.update(inProgress.requestVersion, response, > inProgress.isPartialUpdate, now); > } > ... > {code} > The comments above mean we will may get errors and no nodes if the topic we > want may still be in the process of being created. > However, every meta request will return all brokers from the logic of the > server side, just as followed > {code:java} > def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { > ... > val brokers = metadataCache.getAliveBrokers > ... > } > {code} > I studied the related git commit history and figured out why. > # This comments was first introduced in KAFKA-642 (e11447650a). which means > meta request only need brokers related to the topics we want. > # KAFKA-1535 (commitId: 4ebcdfd51f) changed the server side logic. which has > the metadata response contain all alive brokers rather than just the ones > needed for the given topics. > # However, this comments are retained till now. > So According to my understanding, this comments looks like a stale one and > can be removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao resolved KAFKA-12615. - Resolution: Fixed > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends which is > highlighted as followed should be clearCompletedReceives > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12333) KafkaMetadataLog and MockLock should validate that appended epochs are monotonically
[ https://issues.apache.org/jira/browse/KAFKA-12333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-12333: --- Description: Both the MockLog and KafkaMetadataLog should only allow appendAsLeader and appendAsFollower with monotonically increasing epochs. In other words the following test in KafkaMetadataLogTest should fail: {code:java} @Test def testOutOfOrderEpoch(): Unit = { val topicPartition = new TopicPartition("cluster-metadata", 0) val log = buildMetadataLog(tempDir, mockTime, topicPartition)val recordFoo = new SimpleRecord("foo".getBytes()) val currentEpoch = 3 val initialOffset = log.endOffset().offsetlog.appendAsLeader( MemoryRecords.withRecords(initialOffset, CompressionType.NONE, currentEpoch, recordFoo), currentEpoch )// Out order epoch should throw an exception log.appendAsLeader( MemoryRecords.withRecords( initialOffset + 1, CompressionType.NONE, currentEpoch - 1, recordFoo ), currentEpoch - 1 ) log.appendAsFollower( MemoryRecords.withRecords( initialOffset + 2, CompressionType.NONE, currentEpoch - 2, recordFoo ) ) } {code} The same for MockLogTest. was: Both the MockLog and KafkaMetadataLog should only allow appendAsLeader and appendAsFollower with monotonically increasing epochs. In other words the following test in KafkaMetadataLogTest should fail: {code:java} @Test def testOutOfOrderEpoch(): Unit = { val topicPartition = new TopicPartition("cluster-metadata", 0) val log = buildMetadataLog(tempDir, mockTime, topicPartition)val recordFoo = new SimpleRecord("foo".getBytes()) val currentEpoch = 3 val initialOffset = log.endOffset().offsetlog.appendAsLeader( MemoryRecords.withRecords(initialOffset, CompressionType.NONE, currentEpoch, recordFoo), currentEpoch )// Out order epoch should throw an exception log.appendAsLeader( MemoryRecords.withRecords( initialOffset + 1, CompressionType.NONE, currentEpoch - 1, recordFoo ), currentEpoch - 1 )log.appendAsFollower( MemoryRecords.withRecords( initialOffset + 2, CompressionType.NONE, currentEpoch - 2, recordFoo ) ) } {code} The same for MockLogTest. > KafkaMetadataLog and MockLock should validate that appended epochs are > monotonically > > > Key: KAFKA-12333 > URL: https://issues.apache.org/jira/browse/KAFKA-12333 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: HaiyuanZhao >Priority: Major > > Both the MockLog and KafkaMetadataLog should only allow appendAsLeader and > appendAsFollower with monotonically increasing epochs. In other words the > following test in KafkaMetadataLogTest should fail: > {code:java} > @Test > def testOutOfOrderEpoch(): Unit = { > val topicPartition = new TopicPartition("cluster-metadata", 0) > val log = buildMetadataLog(tempDir, mockTime, topicPartition)val > recordFoo = new SimpleRecord("foo".getBytes()) > val currentEpoch = 3 > val initialOffset = log.endOffset().offsetlog.appendAsLeader( > MemoryRecords.withRecords(initialOffset, CompressionType.NONE, > currentEpoch, recordFoo), > currentEpoch > )// Out order epoch should throw an exception > log.appendAsLeader( > MemoryRecords.withRecords( > initialOffset + 1, CompressionType.NONE, currentEpoch - 1, recordFoo > ), > currentEpoch - 1 > ) > log.appendAsFollower( > MemoryRecords.withRecords( > initialOffset + 2, CompressionType.NONE, currentEpoch - 2, recordFoo > ) > ) > } {code} > The same for MockLogTest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13148) Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13148. - Resolution: Fixed Closing this since it was fixed by KAFKA-12158. > Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE > --- > > Key: KAFKA-13148 > URL: https://issues.apache.org/jira/browse/KAFKA-13148 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Niket Goel >Priority: Major > Labels: kip-500 > > In some cases the RaftClient will return Long.MAX_VALUE: > {code:java} > /** >* Append a list of records to the log. The write will be scheduled for > some time >* in the future. There is no guarantee that appended records will be > written to >* the log and eventually committed. However, it is guaranteed that if > any of the >* records become committed, then all of them will be. >* >* If the provided current leader epoch does not match the current > epoch, which >* is possible when the state machine has yet to observe the epoch > change, then >* this method will return {@link Long#MAX_VALUE} to indicate an offset > which is >* not possible to become committed. The state machine is expected to > discard all >* uncommitted entries after observing an epoch change. >* >* @param epoch the current leader epoch >* @param records the list of records to append >* @return the expected offset of the last record; {@link > Long#MAX_VALUE} if the records could >* be committed; null if no memory could be allocated for the > batch at this time >* @throws org.apache.kafka.common.errors.RecordBatchTooLargeException > if the size of the records is greater than the maximum >* batch size; if this exception is throw none of the elements > in records were >* committed >*/ > Long scheduleAtomicAppend(int epoch, List records); > {code} > The controller doesn't handle this case: > {code:java} > // If the operation returned a batch of records, those > records need to be > // written before we can return our result to the user. > Here, we hand off > // the batch of records to the raft client. They will be > written out > // asynchronously. > final long offset; > if (result.isAtomic()) { > offset = > raftClient.scheduleAtomicAppend(controllerEpoch, result.records()); > } else { > offset = raftClient.scheduleAppend(controllerEpoch, > result.records()); > } > op.processBatchEndOffset(offset); > writeOffset = offset; > resultAndOffset = ControllerResultAndOffset.of(offset, > result); > for (ApiMessageAndVersion message : result.records()) { > replay(message.message(), Optional.empty(), offset); > } > snapshotRegistry.getOrCreateSnapshot(offset); > log.debug("Read-write operation {} will be completed when > the log " + > "reaches offset {}.", this, resultAndOffset.offset()); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13142: Affects Version/s: (was: 3.0.0) > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > The KRaft brokers are not currently validating dynamic configs before > forwarding them to the controller. To ensure that KRaft clusters are easily > upgradable it would be a good idea to validate dynamic configs in the first > release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13099) Message too large error when expiring transactionalIds
[ https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13099: Fix Version/s: 2.6.3 2.5.2 > Message too large error when expiring transactionalIds > -- > > Key: KAFKA-13099 > URL: https://issues.apache.org/jira/browse/KAFKA-13099 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.8.0, > 2.7.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0, 2.5.2, 2.6.3, 2.7.2, 2.8.1 > > > We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing > tombstones for expired transactionalIds. This is possible because we collect > all expired IDs into a single batch. We should ensure that the created > batches are smaller than the max message size. Any expired IDs that cannot > fit can be expired later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr commented on a change in pull request #11159: MINOR: Change default node id in kraft broker properties
dielhennr commented on a change in pull request #11159: URL: https://github.com/apache/kafka/pull/11159#discussion_r681972655 ## File path: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java ## @@ -239,11 +239,11 @@ public void replay(RegisterBrokerRecord record) { features.put(feature.name(), new VersionRange( feature.minSupportedVersion(), feature.maxSupportedVersion())); } +BrokerRegistration prevRegistration = brokerRegistrations.getOrDefault(brokerId, null); Review comment: Yes it affects the logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
hachikuji commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r681961452 ## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ## @@ -80,6 +80,7 @@ public void write(int b) { private int numRecords = 0; private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; +private long deleteHorizonMs; Review comment: Can we rename `firstTimestamp` to `baseTimestamp` here as well? ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated Review comment: Can we document the return type? ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ## @@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() { assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } +/** + * This test is used to see if the base timestamp of the batch has been successfully + * converted to a delete horizon for the tombstones / transaction markers of the batch. + * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon. + */ +@ParameterizedTest +@ArgumentsSource(MemoryRecordsArgumentsProvider.class) +public void testBaseTimestampToDeleteHorizonConversion(Args args) { +int partitionLeaderEpoch = 998; +if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { +ByteBuffer buffer = ByteBuffer.allocate(2048); +MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, +0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); +builder.append(10L, "1".getBytes(), null); + +ByteBuffer filtered = ByteBuffer.allocate(2048); +final long deleteHorizon = Integer.MAX_VALUE / 2; +final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) { Review comment: I think this test could be a little simpler. Rather than going through `filterTo`, we can just use `MemoryRecordsBuilder` directly setting the delete horizon. Maybe it is useful to have both. Perhaps we could add a more direct test in `MemoryRecordsBuilderTest` or something like that? ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, dest: LogSegment, map: OffsetMap, - retainDeletesAndTxnMarkers: Boolean, + retainLegacyDeletesAndTxnMarkers: Boolean, + deleteRetentionMs: Long, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], - stats: CleanerStats): Unit = { -val logCleanerFilter: RecordFilter = new RecordFilter { + stats: CleanerStats, + currentTime: Long): Long = { +var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP + +val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs)
[jira] [Created] (KAFKA-13157) Kafka-dump-log needs to support snapshot records
Jose Armando Garcia Sancio created KAFKA-13157: -- Summary: Kafka-dump-log needs to support snapshot records Key: KAFKA-13157 URL: https://issues.apache.org/jira/browse/KAFKA-13157 Project: Kafka Issue Type: Sub-task Components: kraft Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Extends the kafka-dump-log tool to allow the user to view and print kraft snapshot files -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah merged pull request #11165: MINOR Enable transaction system tests for KRaft in 3.0
mumrah merged pull request #11165: URL: https://github.com/apache/kafka/pull/11165 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11165: MINOR Enable transaction system tests for KRaft in 3.0
mumrah commented on pull request #11165: URL: https://github.com/apache/kafka/pull/11165#issuecomment-892065979 Opening a follow-on PR for the additional tests mentioned by @rondagostino -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #11166: MINOR Enable additional transaction system tests in KRaft
mumrah opened a new pull request #11166: URL: https://github.com/apache/kafka/pull/11166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft
mumrah commented on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892067696 Replication test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4627/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft
mumrah commented on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892068421 Produce bench: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4628/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12973) Update KIP and dev mailing list
[ https://issues.apache.org/jira/browse/KAFKA-12973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-12973: -- Assignee: Jose Armando Garcia Sancio > Update KIP and dev mailing list > --- > > Key: KAFKA-12973 > URL: https://issues.apache.org/jira/browse/KAFKA-12973 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Update KIP-630 and the Kafka mailing list based on the small implementation > deviations from what is documented in the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft
mumrah commented on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892068893 Group mode transactions: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4629/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft
mumrah commented on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170 Streams smoke test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest
YI-CHEN WANG created KAFKA-13158: Summary: Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest Key: KAFKA-13158 URL: https://issues.apache.org/jira/browse/KAFKA-13158 Project: Kafka Issue Type: Sub-task Reporter: YI-CHEN WANG Assignee: YI-CHEN WANG -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12997) Expose log record append time to the controller/broker
[ https://issues.apache.org/jira/browse/KAFKA-12997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-12997. Resolution: Fixed > Expose log record append time to the controller/broker > -- > > Key: KAFKA-12997 > URL: https://issues.apache.org/jira/browse/KAFKA-12997 > Project: Kafka > Issue Type: Sub-task >Reporter: Niket Goel >Assignee: Jose Armando Garcia Sancio >Priority: Minor > Labels: kip-500 > > The snapshot records are generated by each individual quorum participant > which also stamps the append time in the records. These appends times are > generated from a different clock (except in the case of the quorum leader) as > compared to the metadata log records (where timestamps are stamped by the > leader). > To enable having a single clock to compare timestamps, > https://issues.apache.org/jira/browse/KAFKA-12952 adds a timestamp field to > the snapshot header which should contain the append time of the highest > record contained in the snapshot (which will be in leader time). > This JIRA tracks exposing and wiring the batch timestamp such that it can be > provided to the SnapshotWriter at the time of snapshot creation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13159) Enable system tests for transactions in KRaft mode
David Arthur created KAFKA-13159: Summary: Enable system tests for transactions in KRaft mode Key: KAFKA-13159 URL: https://issues.apache.org/jira/browse/KAFKA-13159 Project: Kafka Issue Type: Test Reporter: David Arthur Assignee: David Arthur Fix For: 3.0.0 Previously, we disabled several system tests involving system tests in KRaft mode. Now that KIP-730 is complete and transactions work in KRaft, we need to re-enable these tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13159) Enable system tests for transactions in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392468#comment-17392468 ] David Arthur commented on KAFKA-13159: -- The main transactions_test.py were enabled on 3.0 branch here https://github.com/apache/kafka/pull/11165 > Enable system tests for transactions in KRaft mode > -- > > Key: KAFKA-13159 > URL: https://issues.apache.org/jira/browse/KAFKA-13159 > Project: Kafka > Issue Type: Test >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.0.0, 3.1.0 > > > Previously, we disabled several system tests involving system tests in KRaft > mode. Now that KIP-730 is complete and transactions work in KRaft, we need to > re-enable these tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
rondagostino commented on a change in pull request #11166: URL: https://github.com/apache/kafka/pull/11166#discussion_r682008773 ## File path: tests/kafkatest/tests/core/replication_test.py ## @@ -122,14 +122,16 @@ def min_cluster_size(self): @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT"], -enable_idempotence=[True]) +enable_idempotence=[True], +metadata_quorum=quorum.all_non_upgrade) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT", "SASL_SSL"], metadata_quorum=quorum.all_non_upgrade) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["controller"], -security_protocol=["PLAINTEXT", "SASL_SSL"]) +security_protocol=["PLAINTEXT", "SASL_SSL"], +metadata_quorum=quorum.all_non_upgrade) Review comment: I wonder if this is going to fail. The below code seems to be incorrect: ``` if failure_mode == "controller" and metadata_quorum != quorum.zk: raise Exception("There is no controller broker when using KRaft metadata quorum") ``` That maybe should be checking for `broker_type` instead of `failure_mode`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13159) Enable system tests for transactions in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-13159: - Fix Version/s: 3.1.0 > Enable system tests for transactions in KRaft mode > -- > > Key: KAFKA-13159 > URL: https://issues.apache.org/jira/browse/KAFKA-13159 > Project: Kafka > Issue Type: Test >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.0.0, 3.1.0 > > > Previously, we disabled several system tests involving system tests in KRaft > mode. Now that KIP-730 is complete and transactions work in KRaft, we need to > re-enable these tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682012434 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, dest: LogSegment, map: OffsetMap, - retainDeletesAndTxnMarkers: Boolean, + retainLegacyDeletesAndTxnMarkers: Boolean, + deleteRetentionMs: Long, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], - stats: CleanerStats): Unit = { -val logCleanerFilter: RecordFilter = new RecordFilter { + stats: CleanerStats, + currentTime: Long): Long = { +var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP + +val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) { var discardBatchRecords: Boolean = _ - override def checkBatchRetention(batch: RecordBatch): BatchRetention = { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. -discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) +val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata) + +if (batch.isControlBatch) { + if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) { Review comment: ah right, I didn't catch this. Seems like we don't need this block then, and we can just move into this check if it's a Control Batch then ``` discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
Ryan Dielhenn created KAFKA-13160: - Summary: Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft Key: KAFKA-13160 URL: https://issues.apache.org/jira/browse/KAFKA-13160 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0 Reporter: Ryan Dielhenn Assignee: Ryan Dielhenn Fix For: 3.0.0 In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When these configs are sent to the brokers in a fetch response, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a singular integer. This handler should be fixed to expect empty string for the dynamic default broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wycccccc opened a new pull request #11167: Kafka-13158 Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest
wycc opened a new pull request #11167: URL: https://github.com/apache/kafka/pull/11167 Development of EasyMock and PowerMock has stagnated while Mockito continues to be actively developed. With the new Java cadence, it's a problem to depend on libraries that do bytecode generation and are not actively maintained. In addition, Mockito is also easier to [use.KAFKA-7438](https://issues.apache.org/jira/browse/KAFKA-7438) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr opened a new pull request #11168: KAFKA-13160: Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
dielhennr opened a new pull request #11168: URL: https://github.com/apache/kafka/pull/11168 The KRaft brokers are throwing NumberFormatException when processing dynamic default broker config updates because they expect the default entity name that was used in zookeeper to be the resource name for dynamic default broker configs instead of empty string. https://issues.apache.org/jira/browse/KAFKA-13160 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup { Review comment: hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When these configs are sent to the brokers in a fetch response, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a singular integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. (was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When these configs are sent to the brokers in a fetch response, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a singular integer. This handler should be fixed to expect empty string for the dynamic default broker configs if using KRaft.) > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. When these configs are sent to the brokers in a fetch > response, the BrokerConfigHandler checks if the resource name is "" > to do a default update and converts the resource name to an integer otherwise > to do a per-broker config update. In KRaft dynamic default broker configs are > serialized in the quorum with empty string instead of "". This was > causing the BrokerConfigHandler to throw a NumberFormatException for dynamic > default broker configs since the resource name for them is not "" or > a singular integer. This handler should be fixed to expect empty string as > the resource name for the dynamic default broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated Review comment: It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm removing it ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated Review comment: It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm going to remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12647) Implement loading snapshot in the broker
[ https://issues.apache.org/jira/browse/KAFKA-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-12647. Resolution: Fixed > Implement loading snapshot in the broker > > > Key: KAFKA-12647 > URL: https://issues.apache.org/jira/browse/KAFKA-12647 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12646) Implement snapshot generation on brokers
[ https://issues.apache.org/jira/browse/KAFKA-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-12646. Resolution: Fixed > Implement snapshot generation on brokers > > > Key: KAFKA-12646 > URL: https://issues.apache.org/jira/browse/KAFKA-12646 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: Jose Armando Garcia Sancio >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah edited a comment on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170 Streams smoke test: ~https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/~ https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4631/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When these config snapshots are processed by the brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. (was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When these configs are sent to the brokers in a fetch response, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a singular integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft.) > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. When these config snapshots are processed by the brokers, > the BrokerConfigHandler checks if the resource name is "" to do a > default update and converts the resource name to an integer otherwise to do a > per-broker config update. In KRaft dynamic default broker configs are > serialized in the quorum with empty string instead of "". This was > causing the BrokerConfigHandler to throw a NumberFormatException for dynamic > default broker configs since the resource name for them is not "" or > a single integer. This handler should be fixed to expect empty string as the > resource name for the dynamic default broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When dynamic configs from snapshots are processed by the brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. (was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When these config snapshots are processed by the brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft.) > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. When dynamic configs from snapshots are processed by the > brokers, the BrokerConfigHandler checks if the resource name is "" > to do a default update and converts the resource name to an integer otherwise > to do a per-broker config update. In KRaft dynamic default broker configs are > serialized in the quorum with empty string instead of "". This was > causing the BrokerConfigHandler to throw a NumberFormatException for dynamic > default broker configs since the resource name for them is not "" or > a single integer. This handler should be fixed to expect empty string as the > resource name for the dynamic default broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the brokers in KRaft, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. (was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. When dynamic configs from snapshots are processed by the brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft.) > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the brokers in KRaft, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. In KRaft > dynamic default broker configs are serialized in the quorum with empty string > instead of "". This was causing the BrokerConfigHandler to throw a > NumberFormatException for dynamic default broker configs since the resource > name for them is not "" or a single integer. This handler should be > fixed to expect empty string as the resource name for the dynamic default > broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the brokers in KRaft, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. was:In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the brokers in KRaft, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the brokers in KRaft, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft dynamic default broker configs are serialized in the quorum with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. This handler > should be fixed to expect empty string as the resource name for the dynamic > default broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the brokers in KRaft, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the brokers in KRaft, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the brokers in KRaft, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in the quorum with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. This handler > should be fixed to expect empty string as the resource name for the dynamic > default broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12994: --- Fix Version/s: (was: 3.0.0) 3.1.0 > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > Labels: kip, kip-633 > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392530#comment-17392530 ] A. Sophie Blee-Goldman commented on KAFKA-12994: Hey [~iekpo], I'm unassigning this in case someone else wants to pick it up. If you already started working on this and have a partial PR ready with some subset of the tests migrated over, you can just open that PR and we can merge this in pieces. It seems like a lot of tests so splitting it up into multiple PRs is probably a good idea anyway. (And obviously feel free to re-assign it to yourself if you want to continue working on it, and/or split this ticket up into sub-tasks covering different sets of tests) > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > Labels: kip, kip-633 > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12994: -- Assignee: (was: A. Sophie Blee-Goldman) > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12994: -- Assignee: A. Sophie Blee-Goldman > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12994: -- Assignee: (was: Israel Ekpo) > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Priority: Major > Labels: kip, kip-633 > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12994: --- Labels: kip-633 newbie newbie++ (was: kip kip-633) > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Borzoo opened a new pull request #11169: KAFKA-10859: Add test annotation to FileStreamSourceTaskTest.testInvalidFile and speed up the test
Borzoo opened a new pull request #11169: URL: https://github.com/apache/kafka/pull/11169 *More detailed description of your change, Added the missing @Test annotation to a test in FileStreamSourceTaskTest. The test used to loop 100 times, each time blocking for 1 second. Checking the assertion more than once is unnecessary for this test. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reopened KAFKA-13132: Found an issue where we don't sufficiently cover case 2. I have a plan to properly cover. cc: [~kkonstantine] > Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 > -- > > Key: KAFKA-13132 > URL: https://issues.apache.org/jira/browse/KAFKA-13132 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.0.0 > > > With the change in 3.0 to how topic IDs are assigned to logs, a bug was > inadvertently introduced. Now, topic IDs will only be assigned on the load of > the log to a partition in LISR requests. This means we will only assign topic > IDs for newly created topics/partitions, on broker startup, or potentially > when a partition is reassigned. > > In the case of upgrading from an IBP before 2.8, we may have a scenario where > we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller > is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last > broker upgrading, we will elect a new controller but its LISR request will > not result in topic IDs being assigned to logs of existing topics. They will > only be assigned in the cases mentioned above. > *Keep in mind, in this scenario, topic IDs will be still be assigned in the > controller/ZK to all new and pre-existing topics and will show up in > metadata.* This means we are not ensured the same guarantees we had in 2.8. > *It is just the LISR/partition.metadata part of the code that is affected.* > > The problem is two-fold > 1. We ignore LISR requests when the partition leader epoch has not increased > (previously we assigned the ID before this check) > 2. We only assign the topic ID when we are associating the log with the > partition in replicamanager for the first time. Though in the scenario > described above, we have logs associated with partitions that need to be > upgraded. > > We should check the if the LISR request is resulting in a topic ID addition > and add logic to logs already associated to partitions in replica manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
mjsax commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r682119041 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -25,15 +25,17 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ -LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion +LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion # broker 0.10.0 is not compatible with newer Kafka Streams versions broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \ str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), \ - str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(DEV_BRANCH)] + str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)] metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +metadata_3_10_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), Review comment: What is `3_10` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
mjsax commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r682119892 ## File path: streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; + +public class SmokeTestClient extends SmokeTestUtil { + +private final String name; + +private KafkaStreams streams; +private boolean uncaughtException = false; +private boolean started; +private volatile boolean closed; + +private static void addShutdownHook(final String name, final Runnable runnable) { +if (name != null) { +Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); +} else { +Runtime.getRuntime().addShutdownHook(new Thread(runnable)); +} +} + +private static File tempDirectory() { +final String prefix = "kafka-"; +final File file; +try { +file = Files.createTempDirectory(prefix).toFile(); +} catch (final IOException ex) { +throw new RuntimeException("Failed to create a temp dir", ex); +} +file.deleteOnExit(); + +addShutdownHook("delete-temp-file-shutdown-hook", () -> { +try { +Utils.delete(file); +} catch (final IOException e) { +System.out.println("Error deleting " + file.getAbsolutePath()); +e.printStackTrace(System.out); +} +}); + +return file; +} + +public SmokeTestClient(final String name) { +this.name = name; +} + +public boolean started() { +return started; +} + +public boolean closed() { +return closed; +} + +public void start(final Properties streamsProperties) { +final Topology build = getTopology(); +streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + +final CountDownLatch countDownLatch = new CountDownLatch(1); +streams.setStateListener((newState, oldState) -> { +System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); +if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { +started = true; +countDownLatch.countDown(); +} + +if (newState == KafkaStreams.State.NOT_RUNNING) { +closed = true; +} +}); + +streams.setUncaughtExceptionHandler(e -> { +System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); +System.out.println(name + ": FATAL:
[GitHub] [kafka] jolshan opened a new pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan opened a new pull request #11170: URL: https://github.com/apache/kafka/pull/11170 Before we used the metadata cache to determine whether or not to use topic IDs. Unfortunately, metadata cache updates with ZK controllers are in a separate request and may be too slow for the fetcher thread. This results in switching between topic names and topic IDs for topics that could just use IDs. This change adds topic IDs to FetcherState created in LeaderAndIsr requests. It also supports updating this state for follower threads as soon as a LeaderAndIsr request provides a topic ID. I've opted to only update replica fetcher threads. Alter Log Dir threads will use either topic name or topic ID depending on what was present when they were created. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
mjsax commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r682120783 ## File path: streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.time.Instant; + +public class SmokeTestUtil { + +final static int END = Integer.MAX_VALUE; + +static ProcessorSupplier printProcessorSupplier(final String topic) { +return printProcessorSupplier(topic, ""); +} + +static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { +return new ProcessorSupplier() { +@Override +public Processor get() { +return new AbstractProcessor() { +private int numRecordsProcessed = 0; +private long smallestOffset = Long.MAX_VALUE; +private long largestOffset = Long.MIN_VALUE; + +@Override +public void init(final ProcessorContext context) { +super.init(context); +System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); Review comment: Why `DEV` -- should be `2.8`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12395) Drop topic mapKey in DeleteTopics response
[ https://issues.apache.org/jira/browse/KAFKA-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hiro Kuwabara reassigned KAFKA-12395: - Assignee: Hiro Kuwabara > Drop topic mapKey in DeleteTopics response > -- > > Key: KAFKA-12395 > URL: https://issues.apache.org/jira/browse/KAFKA-12395 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Hiro Kuwabara >Priority: Major > > Now that DeleteTopic requests/responses may be keyed by topicId, the use of > the the topic name as a map key in the response makes less sense. We should > consider dropping it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] codefactor commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
codefactor commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-892207657 Any plans that this will be released? It looks like this upgrade happens to include the following issue fix in rocksdb: https://github.com/facebook/rocksdb/issues/6703 That fixes 2 CVEs: https://nvd.nist.gov/vuln/detail/CVE-2019-12900 https://nvd.nist.gov/vuln/detail/CVE-2016-3189 My application is blocked by security scans due to these, and we need a new release of kafka-streams to get those fixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan opened a new pull request #11171: URL: https://github.com/apache/kafka/pull/11171 Most of [KAFKA-13132](https://issues.apache.org/jira/browse/KAFKA-13132) has been resolved, but there is one part of one case not covered. From the ticket: `2. We only assign the topic ID when we are associating the log with the partition in replicamanager for the first time` We covered the case where the log is already existing when the leader epoch is _equal_ (ie, no updates besides the topic ID), but we don't cover the update case where the leader epoch is bumped and we already have the log associated to the partition. This PR ensures we correctly assign topic ID in the makeLeaders/Followers path when the log already exists. I've also added a test for the bumped leader epoch scenario. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the brokers in KRaft, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in the quorum with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. This handler > should be fixed to expect empty string as the resource name for the dynamic > default broker configs if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682164477 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ## @@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() { assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } +/** + * This test is used to see if the base timestamp of the batch has been successfully + * converted to a delete horizon for the tombstones / transaction markers of the batch. + * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon. + */ +@ParameterizedTest +@ArgumentsSource(MemoryRecordsArgumentsProvider.class) +public void testBaseTimestampToDeleteHorizonConversion(Args args) { +int partitionLeaderEpoch = 998; +if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { +ByteBuffer buffer = ByteBuffer.allocate(2048); +MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, +0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); +builder.append(10L, "1".getBytes(), null); + +ByteBuffer filtered = ByteBuffer.allocate(2048); +final long deleteHorizon = Integer.MAX_VALUE / 2; +final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) { Review comment: I've added a test to `MemoryRecordsBuilderTest` that is similar to this one in `MemoryRecordsTest`, but sets the `deleteHorizon` directly through the constructor. I see having both tests as useful -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11168: KAFKA-13160: Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
cmccabe commented on pull request #11168: URL: https://github.com/apache/kafka/pull/11168#issuecomment-892230644 I don't think this is the right place to make this change. BrokerMetadataPublisher should be calling `processConfigChanges` with the expected name, instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
ableegoldman commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-892231916 @codefactor it will be released in 3.0, which is currently in the final stages of testing and preparing for release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682090255 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java ## @@ -66,14 +72,26 @@ ); } +Map> uncreatedTasksForTopologies(final Set currentTopologies) { +return unknownTasksToBeCreated.entrySet().stream().filter(t -> currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); Review comment: Ditto here. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability private ProcessorTopology globalTopology; -private Map globalStateStores = new HashMap<>(); -final Set allInputTopics = new HashSet<>(); +private final Map globalStateStores = new HashMap<>(); +private final Set allInputTopics = new HashSet<>(); + +public static class TopologyVersion { +public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version +public Set assignedNamedTopologies = new HashSet<>(); // the named topologies whose tasks are actively assigned +public ReentrantLock topologyLock = new ReentrantLock(); +public Condition topologyCV = topologyLock.newCondition(); +} -public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +public TopologyMetadata(final InternalTopologyBuilder builder, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; -builders = new TreeMap<>(); +builders = new ConcurrentSkipListMap<>(); if (builder.hasNamedTopology()) { builders.put(builder.topologyName(), builder); } else { builders.put(UNNAMED_TOPOLOGY, builder); } } -public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +public TopologyMetadata(final ConcurrentNavigableMap builders, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; + this.builders = builders; if (builders.isEmpty()) { -log.debug("Building KafkaStreams app with no empty topology"); +log.debug("Starting up empty KafkaStreams app with no topology"); } } -public int getNumStreamThreads(final StreamsConfig config) { -final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +public void updateCurrentAssignmentTopology(final Set assignedNamedTopologies) { +version.assignedNamedTopologies = assignedNamedTopologies; +} -// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later -if (builders.isEmpty()) { -if (configuredNumStreamThreads != 0) { -log.info("Overriding number of StreamThreads to zero for empty topology"); +/** + * @return the set of named topologies that the assignor distributed tasks for during the last rebalance + */ +public Set assignmentNamedTopologies() { +return version.assignedNamedTopologies; +} + +public long topologyVersion() { +return version.topologyVersion.get(); +} + +public void lock() { +version.topologyLock.lock(); +} + +public void unlock() { +version.topologyLock.unlock(); +} + +public InternalTopologyBuilder getBuilderForTopologyName(final String name) { +return builders.get(name); +} + +/** + * @throws IllegalStateException if the thread is not already holding the lock via TopologyMetadata#lock + */ +public void maybeWaitForNonEmptyTopology() { +if (!version.topologyLock.isHeldByCurrentThread()) { Review comment: I feel a bit concerned about the "asymmetry" of this function: all other functions have the lock inside while this function is supposed to be called by a caller -- i.e. `handleTopologyUpdatesPhase`. It is quite vulnerable to bugs with additional edits. I'm wondering if we can move this logic out of `handleTopologyUpdatesPhase` instead: i.e. we first update the named topology, and then based on the new version we can either wait or re-subscribe and trigger rebalance. WDYT? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Top
[GitHub] [kafka] mjsax opened a new pull request #11172: MINOR: update stream-stream join docs
mjsax opened a new pull request #11172: URL: https://github.com/apache/kafka/pull/11172 Call for review @JimGalasyn @spena @ableegoldman (cf https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics) Must be cherry-picked to `3.0` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #11172: MINOR: update stream-stream join docs
mjsax commented on pull request #11172: URL: https://github.com/apache/kafka/pull/11172#issuecomment-892239651 \cc @kkonstantine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13161) Follower leader and ISR state not updated after partition change in KRaft
Jason Gustafson created KAFKA-13161: --- Summary: Follower leader and ISR state not updated after partition change in KRaft Key: KAFKA-13161 URL: https://issues.apache.org/jira/browse/KAFKA-13161 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jose Armando Garcia Sancio Fix For: 3.0.0 In KRaft when we detect a partition change, we first verify whether any leader or follower transitions are needed. Depending on the case, we call either `applyLocalLeadersDelta` or `applyLocalFollowersDelta`. In the latter case, we are missing a call to `Partition.makeFollower` which is responsible for updating LeaderAndIsr state for the partitions. As a result of this, the partition state may be left stale. The specific consequences of this bug are 1) follower fetching fails since the epoch is never updated, and 2) a stale leader may continue to accept Produce requests. The latter is the bigger issue since it can lead to log divergence if we are appending from both the client and from the fetcher thread at the same time. I tested this locally and confirmed that it is possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #11157: MINOR: Replace EasyMock with Mockito in test-utils module
mjsax merged pull request #11157: URL: https://github.com/apache/kafka/pull/11157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #11157: MINOR: Replace EasyMock with Mockito in test-utils module
mjsax commented on pull request #11157: URL: https://github.com/apache/kafka/pull/11157#issuecomment-892262404 Thanks for the PR @dengziming! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13162) ElectLeader API must be forwarded to Controller
Jason Gustafson created KAFKA-13162: --- Summary: ElectLeader API must be forwarded to Controller Key: KAFKA-13162 URL: https://issues.apache.org/jira/browse/KAFKA-13162 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.0.0 We're missing the logic to forward ElectLeaders requests to the controller. This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This code that calls the handler should be fixed to pass "" instead of empty string if using KRaft. was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This handler should be fixed to expect empty string as the resource name for the dynamic default broker configs if using KRaft. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. This code that > calls the handler should be fixed to pass "" instead of empty string > if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. The code that calls the handler should be fixed to pass "" instead of empty string if using KRaft. was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. This code that calls the handler should be fixed to pass "" instead of empty string if using KRaft. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler should be fixed to pass "" instead of empty string > if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. The code that calls the handler method for config changes should be fixed to pass "" instead of empty string if using KRaft to the handler method. was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. The code that calls the handler method for config changes should be fixed to pass "" instead of empty string if using KRaft. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler method for config changes should be fixed to pass > "" instead of empty string if using KRaft to the handler method. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. The code that calls the handler method for config changes should be fixed to pass "" instead of empty string if using KRaft. was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. The code that calls the handler should be fixed to pass "" instead of empty string if using KRaft. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler method for config changes should be fixed to pass > "" instead of empty string if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Description: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. The code that calls the handler method for config changes should be fixed to pass "" instead of empty string to the handler method if using KRaft. was: In a ZK cluster, dynamic default broker configs are stored in the zNode /brokers/. Without this fix, when dynamic configs from snapshots are processed by the KRaft brokers, the BrokerConfigHandler checks if the resource name is "" to do a default update and converts the resource name to an integer otherwise to do a per-broker config update. In KRaft, dynamic default broker configs are serialized in metadata with empty string instead of "". This was causing the BrokerConfigHandler to throw a NumberFormatException for dynamic default broker configs since the resource name for them is not "" or a single integer. The code that calls the handler method for config changes should be fixed to pass "" instead of empty string if using KRaft to the handler method. > Fix BrokerConfigHandler to expect empty string as the resource name for > dynamic default broker configs in KRaft > --- > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler method for config changes should be fixed to pass > "" instead of empty string to the handler method if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup { Review comment: hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing edit: going through the build errors and I see other usages. I will go through it more thoroughly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0
mjsax commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r682192656 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java ## @@ -19,33 +19,24 @@ import org.apache.kafka.streams.kstream.Window; /** - * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end - * timestamp as exclusive boundary. - * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows - * window specification}) will have the same size. - * - * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. + * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases, + * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the aggregated values of any fixed-size types of time windows. + * We use {@link TimeWindow} to represent these time windows * * @see SessionWindow * @see UnlimitedWindow - * @see org.apache.kafka.streams.kstream.TimeWindows - * @see org.apache.kafka.streams.processor.TimestampExtractor */ public class TimeWindow extends Window { /** - * Create a new window for the given start time (inclusive) and end time (exclusive). + * Create a new window for the given start time and end time. * - * @param startMs the start timestamp of the window (inclusive) - * @param endMs the end timestamp of the window (exclusive) - * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to - * {@code startMs} + * @param startMs the start timestamp of the window Review comment: You add `(inclusive)` and `(exclusive)` in `SessionWindow` but remove it here. Seems inconsistent? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -351,7 +351,8 @@ private void processEarly(final K key, final V value, final long inputRecordTime } if (combinedWindow == null) { -final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); +// created a [start, end] time interval window via SessionWindow +final SessionWindow window = new SessionWindow(0, windows.timeDifferenceMs()); Review comment: I would prefer to _first_ rename existing windows and not merge this PR using `SessionWindows` within `SlidingWindowAggregate`... ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java ## @@ -19,33 +19,24 @@ import org.apache.kafka.streams.kstream.Window; /** - * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end - * timestamp as exclusive boundary. - * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows - * window specification}) will have the same size. - * - * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. + * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases, Review comment: Why `window-agnostic` ? In general, I am not sure why we need to change the existing JavaDocs? What information do you think is missing or wong? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java ## @@ -19,33 +19,24 @@ import org.apache.kafka.streams.kstream.Window; /** - * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end - * timestamp as exclusive boundary. - * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows - * window specification}) will have the same size. - * - * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}. + * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases, + * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the aggregated values of any fixed-size types of time windows. + * We use {@link TimeWindow} to represent these time windows * * @see SessionWindow * @see UnlimitedWindow - * @see org.apache.kafka.streams.kstream.TimeWindows - * @see org.apache.kafka.streams.processor.TimestampExtractor */ public class TimeWindow extends Window { /** - * Create a new window for the given start time (inclusive) and end time (exclusive). + * Create a new window for the given start time and end time. * - * @param startMs the start
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated Review comment: It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm going to remove it edit: going through the build errors and I see other usages. I will go through it more thoroughly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392610#comment-17392610 ] A. Sophie Blee-Goldman commented on KAFKA-12559: [~brz] I'd say give [~msundeq] another day or two to respond and if you don't hear back then feel free to assign this ticket to yourself. I just added you as a contributor on the project so you should be able to self-assign tickets from now on. > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Martin Sundeqvist >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13145) Renaming the time interval window for better understanding
[ https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392611#comment-17392611 ] Matthias J. Sax commented on KAFKA-13145: - Personally, I am not a fan on `InclusiveExclusiveWindow` as a name, but if you feel strong about it, I can be convinced. Because the window classes in question are small an easy, I would rather prefer to just duplicate the code if necessary, and keep the semantically more meaningful names that we have now. Ie, we would just add a `SlidingWindow` (that also has inclusive upper/lower bound, similar to `SessionWindow`) and call it a day. – Because the code is so simple, I am not worried about code duplication personally. For this case, we could close this ticket as "won't fix" and just add `SlidingWindow` class via KAKFA-12839 instead. Thoughts? \cc [~ableegoldman] > Renaming the time interval window for better understanding > -- > > Key: KAFKA-13145 > URL: https://issues.apache.org/jira/browse/KAFKA-13145 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > > I have another thought, which is to rename the time interval related windows. > Currently, we have 3 types of time interval window: > {{TimeWindow}} -> to have {{[start,end)}} time interval > {{SessionWindow}} -> to have {{[start,end]}} time interval > {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval > I think the name {{SessionWindow}} is definitely not good here, especially we > want to use it in {{SlidingWindows}} now, although it is only used for > {{SessionWindows}} before. We should name them with time interval meaning, > not the streaming window functions meaning. {{}}Because these 3 window types > are internal use only, it is safe to rename them. > > {{TimeWindow}} --> {{InclusiveExclusiveWindow}} > {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}} > {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}} > {{}} > See the discussion here{{: > [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}} > {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13152: Labels: needs-kip (was: ) > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] codefactor commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
codefactor commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-892269776 @ableegoldman , Thanks a lot - do you have any links where I can find out the following: 1. are there any breaking changes if I upgrade from version 2.6 to 3.0? 2. when is version 3.0 scheduled to be released to maven central repository? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13150) How is Kafkastream configured to consume data from a specified offset ?
[ https://issues.apache.org/jira/browse/KAFKA-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392612#comment-17392612 ] Matthias J. Sax commented on KAFKA-13150: - To subscribe, follow instruction on the webpage: [https://kafka.apache.org/contact] > How is Kafkastream configured to consume data from a specified offset ? > --- > > Key: KAFKA-13150 > URL: https://issues.apache.org/jira/browse/KAFKA-13150 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: wangjh >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13145) Renaming the time interval window for better understanding
[ https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392615#comment-17392615 ] A. Sophie Blee-Goldman commented on KAFKA-13145: Fine with me. FWIW the "InclusiveExclusiveWindow" name was my idea, but that was just to avoid using something called "SessionWindow" in the _Sliding_ window processor – making a new SlidingWindow class works too. > Renaming the time interval window for better understanding > -- > > Key: KAFKA-13145 > URL: https://issues.apache.org/jira/browse/KAFKA-13145 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > > I have another thought, which is to rename the time interval related windows. > Currently, we have 3 types of time interval window: > {{TimeWindow}} -> to have {{[start,end)}} time interval > {{SessionWindow}} -> to have {{[start,end]}} time interval > {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval > I think the name {{SessionWindow}} is definitely not good here, especially we > want to use it in {{SlidingWindows}} now, although it is only used for > {{SessionWindows}} before. We should name them with time interval meaning, > not the streaming window functions meaning. {{}}Because these 3 window types > are internal use only, it is safe to rename them. > > {{TimeWindow}} --> {{InclusiveExclusiveWindow}} > {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}} > {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}} > {{}} > See the discussion here{{: > [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}} > {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup { Review comment: ~~hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing edit: going through the build errors and I see other usages. I will go through it more thoroughly~~ sorry I jumped the gun a bit w/ the earlier changes There is an addition to the LogCleanerManager that allows the cleaner to check for cleaning logs that have tombstones past the deleteHorizon. The logic in the LogCleanerManager can be paraphrased to "if there are no eligible cleanable logs, we can see if there are logs that have tombstones that can be deleted by checking the Log's latestDeleteHorizon. We can enqueue those with tombstones eligible for cleaning" ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup { Review comment: ~~hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing~~ ~~edit: going through the build errors and I see other usages. I will go through it more thoroughly~~ sorry I jumped the gun a bit w/ the earlier changes There is an addition to the LogCleanerManager that allows the cleaner to check for cleaning logs that have tombstones past the deleteHorizon. The logic in the LogCleanerManager can be paraphrased to "if there are no eligible cleanable logs, we can see if there are logs that have tombstones that can be deleted by checking the Log's latestDeleteHorizon. We can enqueue those with tombstones eligible for cleaning" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated Review comment: ~~It seems like we don't need the return value at all, since we would only be using it to track the latestDeleteHorizon in the Log, but it doesn't seem like we need that either. I'm going to remove it~~ ~~edit: going through the build errors and I see other usages. I will go through it more thoroughly~~ ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup { Review comment: ~~hmm it seems like we only use it in a test. That goes with the return value that was added into the `cleanInto` method in the LogCleaner. I'm going to remove these and see if I can take another approach in the testing~~ ~~edit: going through the build errors and I see other usages. I will go through it more thoroughly~~ ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup { Review comment: sorry I jumped the gun a bit w/ the earlier changes There is an addition to the LogCleanerManager that allows the cleaner to check for cleaning logs that have tombstones past the deleteHorizon. The logic in the LogCleanerManager can be paraphrased to "if there are no eligible cleanable logs, we can see if there are logs that have tombstones that can be deleted by checking the Log's latestDeleteHorizon. We can enqueue those with tombstones eligible for cleaning" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13160) Fix the code the calls the broker’s config handler to pass the expected default resource name for dynamic broker configs
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Summary: Fix the code the calls the broker’s config handler to pass the expected default resource name for dynamic broker configs (was: Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft) > Fix the code the calls the broker’s config handler to pass the expected > default resource name for dynamic broker configs > > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler method for config changes should be fixed to pass > "" instead of empty string to the handler method if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13160) Fix the code that calls the broker’s config handler to pass the expected default resource name when using KRaft.
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn updated KAFKA-13160: -- Summary: Fix the code that calls the broker’s config handler to pass the expected default resource name when using KRaft. (was: Fix the code the calls the broker’s config handler to pass the expected default resource name for dynamic broker configs) > Fix the code that calls the broker’s config handler to pass the expected > default resource name when using KRaft. > > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler method for config changes should be fixed to pass > "" instead of empty string to the handler method if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3
ableegoldman commented on pull request #10568: URL: https://github.com/apache/kafka/pull/10568#issuecomment-892279085 1) Possibly -- you can check out the [Streams upgrade guide](https://kafka.apache.org/28/documentation/streams/upgrade-guide), which describes any public API changes in each version. The section for 3.0 won't appear until it's released, but you can always poke around the [source html for 3.0 changes](https://github.com/apache/kafka/blob/4eb72add11b548e3fe8fea72856af49dc950e444/docs/streams/upgrade-guide.html#L97) if you want to be prepared. 2) Good question -- depends on how quickly the remaining open items can be addressed and if there are any new issues found during the two week testing period. The [3.0 release plan](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177046466) lists the unresolved issues targeted for 3.0, and you can subscribe to the mailing list if you want to keep up with the release progress. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
hachikuji commented on a change in pull request #11171: URL: https://github.com/apache/kafka/pull/11171#discussion_r682206623 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition, } def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = { -isFutureReplica match { - case true if futureLog.isEmpty => -val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) +val logOpt = if (isFutureReplica) futureLog else log +if (logOpt.isEmpty) { + val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) + if (isFutureReplica) this.futureLog = Option(log) - case false if log.isEmpty => -val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) + else this.log = Option(log) - case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.") +} else { + trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.") + logOpt.foreach { log => +if (log.topicId.isEmpty) { Review comment: By the time we get here, I think we have already validated that the topicid is consistent. Nevertheless, I wonder if it makes sense to let `assignTopicId` validate the passed topicId? Currently it will just override the value. ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition, } def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = { -isFutureReplica match { - case true if futureLog.isEmpty => -val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) +val logOpt = if (isFutureReplica) futureLog else log Review comment: Any better? ```scala def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = { def maybeCreate(logOpt: Option[Log]): Log = { logOpt match { case Some(log) => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.") topicId.foreach(log.assignTopicId) log case None => createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) } } if (isFutureReplica) { this.futureLog = Some(maybeCreate(this.futureLog)) } else { this.log = Some(maybeCreate(this.log)) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13145) Renaming the time interval window for better understanding
[ https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392626#comment-17392626 ] Luke Chen commented on KAFKA-13145: --- I'm good. I'll close this ticket and add `SlidingWindow` class via KAKFA-12839. Thank you. > Renaming the time interval window for better understanding > -- > > Key: KAFKA-13145 > URL: https://issues.apache.org/jira/browse/KAFKA-13145 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > > I have another thought, which is to rename the time interval related windows. > Currently, we have 3 types of time interval window: > {{TimeWindow}} -> to have {{[start,end)}} time interval > {{SessionWindow}} -> to have {{[start,end]}} time interval > {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval > I think the name {{SessionWindow}} is definitely not good here, especially we > want to use it in {{SlidingWindows}} now, although it is only used for > {{SessionWindows}} before. We should name them with time interval meaning, > not the streaming window functions meaning. {{}}Because these 3 window types > are internal use only, it is safe to rename them. > > {{TimeWindow}} --> {{InclusiveExclusiveWindow}} > {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}} > {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}} > {{}} > See the discussion here{{: > [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}} > {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13145) Renaming the time interval window for better understanding
[ https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-13145. --- Resolution: Won't Fix > Renaming the time interval window for better understanding > -- > > Key: KAFKA-13145 > URL: https://issues.apache.org/jira/browse/KAFKA-13145 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > > I have another thought, which is to rename the time interval related windows. > Currently, we have 3 types of time interval window: > {{TimeWindow}} -> to have {{[start,end)}} time interval > {{SessionWindow}} -> to have {{[start,end]}} time interval > {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval > I think the name {{SessionWindow}} is definitely not good here, especially we > want to use it in {{SlidingWindows}} now, although it is only used for > {{SessionWindows}} before. We should name them with time interval meaning, > not the streaming window functions meaning. {{}}Because these 3 window types > are internal use only, it is safe to rename them. > > {{TimeWindow}} --> {{InclusiveExclusiveWindow}} > {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}} > {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}} > {{}} > See the discussion here{{: > [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}} > {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682230209 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -64,6 +65,9 @@ private final Map taskProducers; private final StreamThread.ProcessingMode processingMode; +// tasks may be assigned for a NamedTopology that is not yet known by this host, and saved for later creation +private final Map> unknownTasksToBeCreated = new HashMap<>(); Review comment: I think we discussed this already, but in case anyone else is wondering: the leader will always assign tasks based on its view of the current named topologies and topics, ie it does not check on individual subscriptions since the group is assumed to be eventually consistent in this regard. (Note this is actually no different than today; even if each instance of an app has a different input topic in their topology, they will all wind up receiving tasks for whichever topic the leader happened to have.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding commented on pull request #0: URL: https://github.com/apache/kafka/pull/0#issuecomment-892299485 Included the motivation and updated the PR. PTAL @ijuma @junrao @kowshik @satishd -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0
showuon commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r682231711 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -100,17 +99,79 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); +private final String topic = "topic"; +private final String defaultInOrderName = "InOrder"; +private final String defaultReverseName = "Reverse"; +private final long defaultWindowSize = 10L; +private final long defaultRetentionPeriod = 5000L; + +private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { +return inOrderIterator +? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) +: Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); +} + +@SuppressWarnings("unchecked") +@Test +public void testAggregateSmallInputWithZeroTimeDifference() { +final StreamsBuilder builder = new StreamsBuilder(); + +// We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable" +// thus, the window size must be greater than 0 here +final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L); Review comment: @mjsax , answering your review comments here. Please check the above discussion for more info. > Why window-agnostic ? In general, I am not sure why we need to change the existing JavaDocs? What information do you think is missing or wrong? > You add (inclusive) and (exclusive) in SessionWindow but remove it here. Seems inconsistent? > Why do you remove this check? A TimeWindow should not allow this case. > Why do we need to remove this temporarily? --> The answer for the above questions are that we can't create a store supplier with window size of 0 here because we use `TimeWindow` to represent the "windowed KTable" result. @ableegoldman and I both thought it doesn't make sense to use `TimeWindow` to represent it if `WindowStore` is used for both inclusive-exclusive and also inclusive-inclusive windows. We should have a neutral time window for this case. That's why Sophie suggested that we should have a container class that does nothing but hold the start and end time for use in window-agnostic cases like the `CachingWindowStore`. And the container class can be named `TimeWindow`, and we were thinking that after all, we'll rename the `TimeWindow` into `InclusiveExclusiveWindow`, so that's why I changed the java doc/start and end time checking/test for it. So, since we agreed that we won't rename the window, I'll revert it. But still, there's a question there: _We can't create a store supplier with window size of 0 here because we use `TimeWindow` to represent the "windowed KTable" result._ Do you agree we should have a container class to do nothing but hold the start and end time for use in window-agnostic cases like the `CachingWindowStore`? Or any other suggestions? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682231789 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final Collection topolog (v1, v2) -> { throw new IllegalArgumentException("Topology names must be unique"); }, -() -> new TreeMap<>())), +() -> new ConcurrentSkipListMap<>())), config), config, clientSupplier ); -for (final NamedTopology topology : topologies) { -nameToTopology.put(topology.name(), topology); -} } -public NamedTopology getTopologyByName(final String name) { -if (nameToTopology.containsKey(name)) { -return nameToTopology.get(name); -} else { -throw new IllegalArgumentException("Unable to locate a NamedTopology called " + name); +/** + * @return the NamedTopology for the specific name, or Optional.empty() if the application has no NamedTopology of that name + */ +public Optional getTopologyByName(final String name) { +return Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology); +} + +/** + * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, + * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * it to begin processing the new topology. + * + * @throws IllegalArgumentException if this topology name is already in use + * @throws IllegalStateExceptionif streams has not been started or has already shut down + * @throws TopologyExceptionif this topology subscribes to any input topics or pattern already in use + */ +public void addNamedTopology(final NamedTopology newTopology) { +if (hasStartedOrFinishedShuttingDown()) { +throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); +} else if (getTopologyByName(newTopology.name()).isPresent()) { +throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + + " as another of the same name already exists"); } + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); } -public void addNamedTopology(final NamedTopology topology) { -nameToTopology.put(topology.name(), topology); -throw new UnsupportedOperationException(); +/** + * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are + * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * it stops processing the old topology. + * + * @throws IllegalArgumentException if this topology name cannot be found + * @throws IllegalStateExceptionif streams has not been started or has already shut down + * @throws TopologyExceptionif this topology subscribes to any input topics or pattern already in use + */ +public void removeNamedTopology(final String topologyToRemove) { +if (!isRunningOrRebalancing()) { +throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); +} else if (!getTopologyByName(topologyToRemove).isPresent()) { +throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); +} + +topologyMetadata.unregisterTopology(topologyToRemove); } -public void removeNamedTopology(final String namedTopology) { -throw new UnsupportedOperationException(); +/** + * Do a clean up of the local state directory for this NamedTopology by deleting all data with regard to the + * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the ({@link StreamsConfig#STATE_DIR_CONFIG}) + * + * May be called while the Streams is in any state, but only on a {@link NamedTopology} that has already been + * removed via {@link #removeNamedTopology(String)}. + * + * Calling this method triggers a restore of local {@link StateStore}s for this {@link NamedTopology} if it is + * ever re-added via {@link #addNamedTopology(NamedTopology)}. + * + * @throws IllegalStateException if this {@code NamedTopology} hasn't been removed + * @throws StreamsException if cleanup failed + */ +public void cleanUpNamedTopology(final String name) { +if (getTopologyBy
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682233727 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability private ProcessorTopology globalTopology; -private Map globalStateStores = new HashMap<>(); -final Set allInputTopics = new HashSet<>(); +private final Map globalStateStores = new HashMap<>(); +private final Set allInputTopics = new HashSet<>(); + +public static class TopologyVersion { +public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version +public Set assignedNamedTopologies = new HashSet<>(); // the named topologies whose tasks are actively assigned +public ReentrantLock topologyLock = new ReentrantLock(); +public Condition topologyCV = topologyLock.newCondition(); +} -public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +public TopologyMetadata(final InternalTopologyBuilder builder, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; -builders = new TreeMap<>(); +builders = new ConcurrentSkipListMap<>(); if (builder.hasNamedTopology()) { builders.put(builder.topologyName(), builder); } else { builders.put(UNNAMED_TOPOLOGY, builder); } } -public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +public TopologyMetadata(final ConcurrentNavigableMap builders, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; + this.builders = builders; if (builders.isEmpty()) { -log.debug("Building KafkaStreams app with no empty topology"); +log.debug("Starting up empty KafkaStreams app with no topology"); } } -public int getNumStreamThreads(final StreamsConfig config) { -final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +public void updateCurrentAssignmentTopology(final Set assignedNamedTopologies) { +version.assignedNamedTopologies = assignedNamedTopologies; +} -// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later -if (builders.isEmpty()) { -if (configuredNumStreamThreads != 0) { -log.info("Overriding number of StreamThreads to zero for empty topology"); +/** + * @return the set of named topologies that the assignor distributed tasks for during the last rebalance + */ +public Set assignmentNamedTopologies() { +return version.assignedNamedTopologies; +} + +public long topologyVersion() { +return version.topologyVersion.get(); +} + +public void lock() { +version.topologyLock.lock(); +} + +public void unlock() { +version.topologyLock.unlock(); +} + +public InternalTopologyBuilder getBuilderForTopologyName(final String name) { +return builders.get(name); +} + +/** + * @throws IllegalStateException if the thread is not already holding the lock via TopologyMetadata#lock + */ +public void maybeWaitForNonEmptyTopology() { +if (!version.topologyLock.isHeldByCurrentThread()) { +throw new IllegalStateException("Must call lock() before attempting to wait on non-empty topology"); +} +while (isEmpty()) { +try { +log.debug("Detected that the topology is currently empty, going to wait for something to be added"); +version.topologyCV.await(); +} catch (final InterruptedException e) { +log.debug("StreamThread was interrupted while waiting on empty topology", e); +} +} +} + +public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { +try { +lock(); +version.topologyVersion.incrementAndGet(); +log.info("Adding NamedTopology {}, latest topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get()); +builders.put(newTopologyBuilder.topologyName(), newTopologyBuilder); +buildAndVerifyTopology(newTopologyBuilder); +version.topologyCV.signalAll(); +} finally { +
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682234726 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability private ProcessorTopology globalTopology; -private Map globalStateStores = new HashMap<>(); -final Set allInputTopics = new HashSet<>(); +private final Map globalStateStores = new HashMap<>(); +private final Set allInputTopics = new HashSet<>(); + +public static class TopologyVersion { +public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version +public Set assignedNamedTopologies = new HashSet<>(); // the named topologies whose tasks are actively assigned +public ReentrantLock topologyLock = new ReentrantLock(); +public Condition topologyCV = topologyLock.newCondition(); +} -public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +public TopologyMetadata(final InternalTopologyBuilder builder, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; -builders = new TreeMap<>(); +builders = new ConcurrentSkipListMap<>(); if (builder.hasNamedTopology()) { builders.put(builder.topologyName(), builder); } else { builders.put(UNNAMED_TOPOLOGY, builder); } } -public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +public TopologyMetadata(final ConcurrentNavigableMap builders, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; + this.builders = builders; if (builders.isEmpty()) { -log.debug("Building KafkaStreams app with no empty topology"); +log.debug("Starting up empty KafkaStreams app with no topology"); } } -public int getNumStreamThreads(final StreamsConfig config) { -final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +public void updateCurrentAssignmentTopology(final Set assignedNamedTopologies) { +version.assignedNamedTopologies = assignedNamedTopologies; +} -// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later -if (builders.isEmpty()) { -if (configuredNumStreamThreads != 0) { -log.info("Overriding number of StreamThreads to zero for empty topology"); +/** + * @return the set of named topologies that the assignor distributed tasks for during the last rebalance + */ +public Set assignmentNamedTopologies() { +return version.assignedNamedTopologies; +} + +public long topologyVersion() { +return version.topologyVersion.get(); +} + +public void lock() { +version.topologyLock.lock(); +} + +public void unlock() { +version.topologyLock.unlock(); +} + +public InternalTopologyBuilder getBuilderForTopologyName(final String name) { +return builders.get(name); +} + +/** + * @throws IllegalStateException if the thread is not already holding the lock via TopologyMetadata#lock + */ +public void maybeWaitForNonEmptyTopology() { +if (!version.topologyLock.isHeldByCurrentThread()) { +throw new IllegalStateException("Must call lock() before attempting to wait on non-empty topology"); +} +while (isEmpty()) { +try { +log.debug("Detected that the topology is currently empty, going to wait for something to be added"); +version.topologyCV.await(); +} catch (final InterruptedException e) { +log.debug("StreamThread was interrupted while waiting on empty topology", e); +} +} +} + +public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { +try { +lock(); +version.topologyVersion.incrementAndGet(); +log.info("Adding NamedTopology {}, latest topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get()); +builders.put(newTopologyBuilder.topologyName(), newTopologyBuilder); +buildAndVerifyTopology(newTopologyBuilder); +version.topologyCV.signalAll(); +} finally { +