Re: [DISCUSS] Apache Kafka 3.2.1 release
+1, Thanks David! On Thu, Jul 14, 2022 at 1:16 PM David Jacot wrote: > +1. Thanks David. > > Le mer. 13 juil. 2022 à 23:43, José Armando García Sancio > a écrit : > > > +1. Thanks for volunteering David. > > > > -- > > -José > > >
[jira] [Created] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries
Adrian Preston created KAFKA-14074: -- Summary: Restarting a broker during re-assignment can leave log directory entries Key: KAFKA-14074 URL: https://issues.apache.org/jira/browse/KAFKA-14074 Project: Kafka Issue Type: Bug Affects Versions: 3.1.0, 2.8.0 Reporter: Adrian Preston Re-starting a broker while replicas are being assigned away from the broker can result in topic partition directories being left in the broker’s log directory. This can trigger further problems if such a topic is deleted and re-created. These problems occur when replicas for the new topic are placed on a broker that hosts a “stale” topic partition directory of the same name, causing the on-disk topic partition state held by different brokers in the cluster to diverge. We have also been able to re-produce variants this problem using Kafka 2.8 and 3.1, as well as Kafka built from the head of the apache/kafka repository (at the time of writing this is commit: 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to re-produce this problem with Kafka running in KRaft mode. A minimal re-create for topic directories being left on disk is as follows: # Start ZooKeeper and a broker (both using the sample config) # Create 100 topics: each with 1 partition, and with replication factor 1 # Add a second broker to the Kafka cluster (with minor edits to the sample config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}}) # Issue a re-assignment that moves all of the topic partition replicas from the first broker to the second broker # While this re-assignment is taking place shutdown the first broker (you need to be quick with only two brokers and 100 topics…) # Wait a few seconds for the re-assignment to stall # Restart the first broker and wait for the re-assignment to complete and it to remove any partially deleted topics (e.g. those with a “-delete” suffix). Inspecting the logs directory for the first broker should show directories corresponding to topic partitions that are owned by the second broker. These are not cleaned up when the re-assignment completes, and also remain in the logs directory even if the first broker is restarted. Deleting the topic also does not clean up the topic partitions left behind on the first broker - which leads to a second potential problem. For topics that have more than one replica: a new topic that has the same name as a previously deleted topic might have replicas created on a broker with “stale” topic partition directories. If this happens these topics will remain in an under-replicated state. A minimal re-create for this is as follows: # Create a three node Kafka cluster (backed by ZK) based off the sample config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2) # Create 100 topics: each with 1 partition, and with replication factor 2 # Submit a re-assignment to move all of the topic partition replicas to kafka-0 and kafka-1, and wait for it to complete # Submit a re-assignment to move all of the topic partition replicas on kafka-0 to kafka-2. # While this re-assignment is taking place shutdown and re-start kafka-0. # Wait for the re-assignment to complete, and check that there’s unexpected topic partition directories in kafka-0’s logs directory # Delete all 100 topics, and re-create 100 new topics with the same name and configuration as the deleted topics. In this state kafka-1 and kafka-2 continually generate log messages similar to: {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition test-039-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)}} Topics that have had replicas created on kafka-0 are under-replicated with kafka-0 missing from the ISR list. Performing a rolling restart of each broker in turn does not resolve the problem, in fact more partitions are listed as under-replicated, as before kafka-0 is missing from their ISR list. I also tried to re-create this with Kafka running in Kraft mode, but was unable to do so. My test configuration was three brokers configured based on /config/kraft/server.properties. All three brokers were part of the controller quorum. Interestingly I see log lines like the following when re-starting the broker that I stopped mid-reassignment: {{[2022-07-14 13:44:42,705] INFO Found stray log dir Log(dir=/tmp/kraft-2/test-029-0, topicId=DMGA3zxyQqGUfeV6cmkcmg, topic=test-029, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=0): the current replica assignment [I@530d4c70 does not contain the local brokerId 2. (kafka.server.metadata.BrokerMetadataPublisher$)}} With later log lines showing the topic be
Re: [DISCUSS] KIP-844: Transactional State Stores
Hi, I updated the KIP with the following changes: * Replaced in-memory batches with the secondary-store approach as the default implementation to address the feedback about memory pressure as suggested by Sagar and Bruno. * Introduced StateStore#commit and StateStore#recover methods as an extension of the rollback idea. @Guozhang, please see the comment below on why I took a slightly different approach than you suggested. * Removed mentions of changes to IQv1 and IQv2. Transactional state stores enable reading committed in IQ, but it is really an independent feature that deserves its own KIP. Conflating them unnecessarily increases the scope for discussion, implementation, and testing in a single unit of work. I also published a prototype - https://github.com/apache/kafka/pull/12393 that implements changes described in the proposal. Regarding explicit rollback, I think it is a powerful idea that allows other StateStore implementations to take a different path to the transactional behavior rather than keep 2 state stores. Instead of introducing a new commit token, I suggest using a changelog offset that already 1:1 corresponds to the materialized state. This works nicely because Kafka Stream first commits an AK transaction and only then checkpoints the state store, so we can use the changelog offset to commit the state store transaction. I called the method StateStore#recover rather than StateStore#rollback because a state store might either roll back or forward depending on the specific point of the crash failure.Consider the write algorithm in Kafka Streams is: 1. write stuff to the state store 2. producer.sendOffsetsToTransaction(token); producer.commitTransaction(); 3. flush 4. checkpoint Let's consider 3 cases: 1. If the crash failure happens between #2 and #3, the state store rolls back and replays the uncommitted transaction from the changelog. 2. If the crash failure happens during #3, the state store can roll forward and finish the flush/commit. 3. If the crash failure happens between #3 and #4, the state store should do nothing during recovery and just proceed with the checkpoint. Looking forward to your feedback, Alexander On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov < asorokou...@confluent.io> wrote: > Hi, > > As a status update, I did the following changes to the KIP: > * replaced configuration via the top-level config with configuration via > Stores factory and StoreSuppliers, > * added IQv2 and elaborated how readCommitted will work when the store is > not transactional, > * removed claims about ALOS. > > I am going to be OOO in the next couple of weeks and will resume working > on the proposal and responding to the discussion in this thread starting > June 27. My next top priorities are: > 1. Prototype the rollback approach as suggested by Guozhang. > 2. Replace in-memory batches with the secondary-store approach as the > default implementation to address the feedback about memory pressure as > suggested by Sagar and Bruno. > 3. Adjust Stores methods to make transactional implementations pluggable. > 4. Publish the POC for the first review. > > Best regards, > Alex > > On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang wrote: > >> Alex, >> >> Thanks for your replies! That is very helpful. >> >> Just to broaden our discussions a bit here, I think there are some other >> approaches in parallel to the idea of "enforce to only persist upon >> explicit flush" and I'd like to throw one here -- not really advocating >> it, >> but just for us to compare the pros and cons: >> >> 1) We let the StateStore's `flush` function to return a token instead of >> returning `void`. >> 2) We add another `rollback(token)` interface of StateStore which would >> effectively rollback the state as indicated by the token to the snapshot >> when the corresponding `flush` is called. >> 3) We encode the token and commit as part of >> `producer#sendOffsetsToTransaction`. >> >> Users could optionally implement the new functions, or they can just not >> return the token at all and not implement the second function. Again, the >> APIs are just for the sake of illustration, not feeling they are the most >> natural :) >> >> Then the procedure would be: >> >> 1. the previous checkpointed offset is 100 >> ... >> 3. flush store, make sure all writes are persisted; get the returned token >> that indicates the snapshot of 200. >> 4. producer.sendOffsetsToTransaction(token); producer.commitTransaction(); >> 5. Update the checkpoint file (say, the new value is 200). >> >> Then if there's a failure, say between 3/4, we would get the token from >> the >> last committed txn, and first we would do the restoration (which may get >> the state to somewhere between 100 and 200), then call >> `store.rollback(token)` to rollback to the snapshot of offset 100. >> >> The pros is that we would then not need to enforce the state stores to not >> persist any data during the txn: for stores that may not be able to >> implement the `rollba
[GitHub] [kafka-site] mumrah merged pull request #423: Add a new signing key for David Arthur
mumrah merged PR #423: URL: https://github.com/apache/kafka-site/pull/423 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re:[DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Kudos David, Guozhang, and Jason for putting together such a great proposal. I don't want to hijack the discussion, just wanted to mention that it would be great if the final design is made extensible enough, so other use cases (like Kafka Connect, Schema Registry, etc.) can be added later on. I can see how the concept of different group "types" in the group coordinator can be leveraged to support such cases. On KIP-795, I wanted to add public APIs for the AbstractCoordinator with the intent of formalizing the use of the Group Membership Protocol for resource management use cases. I'll probably close this KIP and wait to see what comes out of this redesign of the protocol. Thanks - https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To: dev@kafka.apache.org Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol Hi all, I would like to start a discussion thread on KIP-848: The Next Generation of the Consumer Rebalance Protocol. With this KIP, we aim to make the rebalance protocol (for consumers) more reliable, more scalable, easier to implement for clients, and easier to debug for operators. The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. Please take a look and let me know what you think. Best, David PS: I will be away from July 18th to August 8th. That gives you a bit of time to read and digest this long KIP.
[jira] [Created] (KAFKA-14075) Consumer Group deletion does not delete pending transactional offset commits
Jeff Kim created KAFKA-14075: Summary: Consumer Group deletion does not delete pending transactional offset commits Key: KAFKA-14075 URL: https://issues.apache.org/jira/browse/KAFKA-14075 Project: Kafka Issue Type: Bug Reporter: Jeff Kim Assignee: Jeff Kim In [GroupMetadata.removeAllOffsets()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L729-L740] we pass in the offsets cache to delete pendingTransactionalOffsetCommits upon group deletion. So only transactional offset commits for topic partitions already in the offsets cache will be deleted. However, we add a transactional offset commit to the offsets cache only after the commit/abort marker is written to the log in [GroupMetadata.completePendingTxnOffsetCommit()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L692] So even after a group deletion we can still have pending transactional offset commits for a group that's supposed to be deleted. The group metadata manager will throw an IllegalStateException [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L740] while loading group to memory. We will hit this exception on every load to group as long as the hanging transaction never completes. We should delete all pending transactional offset commits (instead of only topic partitions that exist in the offsets cache) when a group is deleted in GroupMetadata.removeOffsets() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions
Jim Hughes created KAFKA-14076: -- Summary: Fix issues with KafkaStreams.CloseOptions Key: KAFKA-14076 URL: https://issues.apache.org/jira/browse/KAFKA-14076 Project: Kafka Issue Type: Bug Reporter: Jim Hughes The new `close(CloseOptions)` function has a few bugs. ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)] Notably, it needs to remove CGs per StreamThread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks Hector! Yes, making the templated group "type" with extensible handling logic is part of the motivation of this rebalance protocol. Guozhang On Thu, Jul 14, 2022 at 10:35 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) < hgerald...@bloomberg.net> wrote: > Kudos David, Guozhang, and Jason for putting together such a great > proposal. > > I don't want to hijack the discussion, just wanted to mention that it > would be great if the final design is made extensible enough, so other use > cases (like Kafka Connect, Schema Registry, etc.) can be added later on. > > I can see how the concept of different group "types" in the group > coordinator can be leveraged to support such cases. On KIP-795, I wanted to > add public APIs for the AbstractCoordinator with the intent of formalizing > the use of the Group Membership Protocol for resource management use cases. > I'll probably close this KIP and wait to see what comes out of this > redesign of the protocol. > > Thanks > > - > https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator > > From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To: > dev@kafka.apache.org > Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance > Protocol > > Hi all, > > I would like to start a discussion thread on KIP-848: The Next > Generation of the Consumer Rebalance Protocol. With this KIP, we aim > to make the rebalance protocol (for consumers) more reliable, more > scalable, easier to implement for clients, and easier to debug for > operators. > > The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. > > Please take a look and let me know what you think. > > Best, > David > > PS: I will be away from July 18th to August 8th. That gives you a bit > of time to read and digest this long KIP. > > > -- -- Guozhang
[jira] [Resolved] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics
[ https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13846. --- Fix Version/s: 3.3.0 Resolution: Fixed > Add an overloaded metricOrElseCreate function in Metrics > > > Key: KAFKA-13846 > URL: https://issues.apache.org/jira/browse/KAFKA-13846 > Project: Kafka > Issue Type: Improvement > Components: metrics >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: newbie > Fix For: 3.3.0 > > > The `Metrics` registry is often used by concurrent threads, however it's > get/create APIs are not well suited for it. A common pattern from the user > today is: > {code} > metric = metrics.metric(metricName); > if (metric == null) { > try { > metrics.createMetric(..) > } catch (IllegalArgumentException e){ > // another thread may create the metric at the mean time > } > } > {code} > Otherwise the caller would need to synchronize the whole block trying to get > the metric. However, the `createMetric` function call itself indeed > synchronize internally on updating the metric map. > So we could consider adding a metricOrElseCreate function which is similar to > createMetric, but instead of throwing an illegal argument exception within > the internal synchronization block, it would just return the already existing > metric. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14077) KRaft should support recovery from failed disk
Jason Gustafson created KAFKA-14077: --- Summary: KRaft should support recovery from failed disk Key: KAFKA-14077 URL: https://issues.apache.org/jira/browse/KAFKA-14077 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Fix For: 3.3.0 If one of the nodes in the metadata quorum has a disk failure, there is no way currently to safely bring the node back into the quorum. When we lose disk state, we are at risk of losing committed data even if the failure only affects a minority of the cluster. Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and v3. Initially, v1 is the leader and writes a record at offset 1. After v2 acknowledges replication of the record, it becomes committed. Suppose that v1 fails before v3 has a chance to replicate this record. As long as v1 remains down, the raft protocol guarantees that only v2 can become leader, so the record cannot be lost. The raft protocol expects that when v1 returns, it will still have that record, but what if there is a disk failure, the state cannot be recovered and v1 participates in leader election? Then we would have committed data on a minority of the voters. The main problem here concerns how we recover from this impaired state without risking the loss of this data. Consider a naive solution which brings v1 back with an empty disk. Since the node has lost is prior knowledge of the state of the quorum, it will vote for any candidate that comes along. If v3 becomes a candidate, then it will vote for itself and it just needs the vote from v1 to become leader. If that happens, then the committed data on v2 will become lost. This is just one scenario. In general, the invariants that the raft protocol is designed to preserve go out the window when disk state is lost. For example, it is also possible to contrive a scenario where the loss of disk state leads to multiple leaders. There is a good reason why raft requires that any vote cast by a voter is written to disk since otherwise the voter may vote for different candidates in the same epoch. Many systems solve this problem with a unique identifier which is generated automatically and stored on disk. This identifier is then committed to the raft log. If a disk changes, we would see a new identifier and we can prevent the node from breaking raft invariants. Then recovery from a failed disk requires a quorum reconfiguration. We need something like this in KRaft to make disk recovery possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
KAFKA-13572 Negative preferred replica imbalance metric
Hi, Kafka. We found that the race in topic-deletion procedure could cause the preferred replica imbalance metric to be negative. The phenomenon can easily happen when many topics are deleted at once, and since we use the metric for monitoring, we have to restart the controller to fix the metric every time it happens. I submitted a patch to fix it: https://github.com/apache/kafka/pull/12405 It'd be appreciated if anyone could review the PR. Thanks, -- Okada Haruki ocadar...@gmail.com
Re: [DISCUSS] Apache Kafka 3.3.0 Release
Hey Jose, Thanks for volunteering to manage the release! KIP-833 is currently slotted for 3.3. We've been getting some help from Jack Vanlighty to validate the raft implementation in TLA+ and with frameworks like Jepsen. The specification is written here if anyone is interested: https://github.com/Vanlightly/raft-tlaplus/blob/main/specifications/pull-raft/KRaft.tla. The main gap that this work uncovered in our implementation is documented here: https://issues.apache.org/jira/browse/KAFKA-14077. I do believe that KIP-833 depends on fixing this issue, so I wanted to see how you feel about giving us a little more time to address it? Thanks, Jason On Wed, Jul 13, 2022 at 10:01 AM Sagar wrote: > Hey Jose, > > Well actually I have 2 approved PRs from Kafka Connect: > > https://github.com/apache/kafka/pull/12321 > https://github.com/apache/kafka/pull/12309 > > Not sure how to get these merged though but I think these can go into 3.3 > release. > > Thanks! > Sagar. > > > On Wed, Jul 13, 2022 at 5:03 PM Divij Vaidya > wrote: > > > Hey Jose > > > > A few of my PRs are pending review for quite some which I was hoping to > > merge into 3.3. I have already marked them with "Fix version=3.3.0" so > that > > you can track them using the JIRA filter you shared earlier > > < > > > https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%203.3.0%20AND%20status%20not%20in%20(resolved%2C%20closed)%20ORDER%20BY%20priority%20DESC%2C%20status%20DESC%2C%20updated%20DESC%20%20%20%20%20%20 > > > > > in this thread. Would you have some time to review them? > > > > Notable amongst them would be: > > 1. Fix the rate window size calculation for edge cases - > > https://github.com/apache/kafka/pull/12184 > > 2. Fix resource leaks - https://github.com/apache/kafka/pull/12228 > > > > And the complete list would be at: > > > > > https://github.com/search?q=is%3Aopen+is%3Apr+author%3Adivijvaidya+is%3Apr+repo%3Aapache%2Fkafka+created%3A2022-04-01..2022-07-30&type=Issues > > > > > > -- > > Divij Vaidya > > > > > > > > On Mon, Jul 11, 2022 at 5:12 PM José Armando García Sancio > > wrote: > > > > > Hi all, > > > > > > I created the branch for 3.3 > > > (https://github.com/apache/kafka/tree/3.3). If you have bug fixes for > > > the 3.3.0 release please make sure to cherry pick them to that branch. > > > > > > Thanks > > > > > >
[jira] [Created] (KAFKA-14078) Replica fetches to follower should return NOT_LEADER error
Jason Gustafson created KAFKA-14078: --- Summary: Replica fetches to follower should return NOT_LEADER error Key: KAFKA-14078 URL: https://issues.apache.org/jira/browse/KAFKA-14078 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.3.0 After the fix for KAFKA-13837, if a follower receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. We need to do epoch leader/epoch validation first before we check whether we have a valid replica. -- This message was sent by Atlassian Jira (v8.20.10#820010)