Thank you Jason for the ideas. On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <ja...@confluent.io> wrote:
> Hi Boyang, > > Thanks for the updates. A few comments below: > > 1. The KIP mentions that `transaction.timeout.ms` should be reduced to > 10s. > I think this makes sense for Kafka Streams which is tied to the consumer > group semantics and uses a default 10s session timeout. However, it seems a > bit dangerous to make this change for the producer generally. Could we just > change it for streams? > > That sounds good to me. > 2. The new `initTransactions` API takes a `Consumer` instance. I think the > idea is to basically put in a backdoor to give the producer access to the > group generationId. It's not clear to me how this would work given package > restrictions. I wonder if it would be better to just expose the state we > need from the consumer. I know we have been reluctant to do this so far > because we treat the generationId as an implementation detail. However, I > think we might just bite the bullet and expose it rather than coming up > with a messy hack. Concepts such as memberIds have already been exposed in > the AdminClient, so maybe it is not too bad. Alternatively, we could use an > opaque type. For example: > > // public > interface GroupMetadata {} > > // private > interface ConsumerGroupMetadata { > final int generationId; > final String memberId; > } > > // Consumer API > public GroupMetadata groupMetadata(); > > I am probably leaning toward just exposing the state we need. > > Yes, also to mention that Kafka Streams use generic Cosnumer API which doesn't have rich states like a full `KafkaConsumer`. The hack will not work as expected. Instead, just exposing the consumer generation.id seems a way easier work. We could consolidate the API and make it 3. Given that we are already providing a way to propagate group state from > the consumer to the producer, I wonder if we may as well include the > memberId and groupInstanceId. This would make the validation we do for > TxnOffsetCommit consistent with OffsetCommit. If for no other benefit, at > least this may help with debugging. > Yes, we could put them into the GroupMetadata struct. > 4. I like the addition of isolation_level to the offset fetch. At the same > time, its behavior is a bit inconsistent with how it is used in the > consumer generally. There is no reason for the group coordinator to ever > expose aborted data, so this is mostly about awaiting pending offset > commits, not reading uncommitted data. Perhaps instead of calling this > "isolation level," it should be more like "await_pending_transaction" or > something like that? > > Also, just to be clear, the consumer would treat this as an optional field, > right? So if the broker does not support the latest OffsetFetch API, it > would silently revert to reading the old data. Basically it would be up to > the streams version probing logic to ensure that the expectation on this > API fits with the usage of `transctional.id`. > > Sounds like a better naming to me, while I think it could be shortened to `await_transaction`. I think the field should be optional, too. > Thanks, > Jason > > > > > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Hey Guozhang, > > > > I will correct my statement from last email. I don't think the > > read_committed (3.a) is necessary to be added to the OffsetFetch request, > > as if we are using EOS application, the underlying consumers within the > > group should always back off when there is pending offsets. > > > > Let me know if you think this is correct. > > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen <reluctanthero...@gmail.com> > > wrote: > > > > > Thank you Guozhang for the questions, inline answers are below. > > > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <reluctanthero...@gmail.com > > > > > wrote: > > > > > >> Hey all, > > >> > > >> I have done a fundamental polish of KIP-447 > > >> < > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > > and > > >> written a design doc > > >> < > > > https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit# > > > > depicting > > >> internal changes. We stripped off many implementation details from the > > KIP, > > >> and simplified the public changes by a lot. For reviewers, it is > highly > > >> recommended to fully understand EOS design in KIP-98 and read its > > >> corresponding design doc > > >> < > > > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit > > > > if > > >> you haven't done so already. > > >> > > >> Let me know if you found anything confusing around the KIP or the > > design. > > >> Would be happy to discuss in depth. > > >> > > >> Best, > > >> Boyang > > >> > > >> > > >> > > >> > > >> > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > >>> 2. The reason we did not expose generation.id from KafkaConsumer > > public > > >>> APIs directly is to abstract this notion from users (since it is an > > >>> implementation detail of the rebalance protocol itself, e.g. if user > > >>> calls > > >>> consumer.assign() they do not need to invoke ConsumerCoordinator and > no > > >>> need to be aware of generation.id at all). > > >>> > > >>> On the other hand, with the current proposal the txn.coordiantor did > > not > > >>> know about the latest generation from the source-of-truth > > >>> group.coordinator; instead, it will only bump up the generation from > > the > > >>> producer's InitProducerIdRequest only. > > >>> > > >>> The key here is that GroupCoordinator, when handling > > >>> `InitProducerIdRequest > > >>> > > >> In the new design, we just pass the entire consumer instance into the > > > producer through > > > #initTransaction, so no public API will be created. > > > > > >> 3. I agree that if we rely on the group coordinator to block on > > returning > > >>> offset-fetch-response if read-committed is enabled, then we do not > need > > >>> to > > >>> store partition assignment on txn coordinator and therefore it's > better > > >>> to > > >>> still decouple them. For that case we still need to update the KIP > wiki > > >>> page that includes: > > >>> > > >>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well. > > >>> 3.b. Add new error code in OffsetFetchResponse to let client backoff > > and > > >>> retry if there are pending txns including the interested partitions. > > >>> 3.c. Also in the worst case we would let the client be blocked for > the > > >>> txn.timeout period, and for that rationale we may need to consider > > >>> reducing > > >>> our default txn.timeout value as well. > > >>> > > >>> Addressed 3.b and 3.c, will do 3.a. > > > > > >> 4. According to Colin it seems we do not need to create another KIP > and > > we > > >>> can just complete it as part of KIP-117 / KAFKA-5214; and we need to > do > > >>> some cleanup to have BrokerApiVersion exposed from AdminClient > (@Colin > > >>> please let use know if you have any concerns exposing it). > > >>> > > >> I think we no longer need to rely on api version for initialization, > > > since we will be using the upgrade.from config anyway. > > > > > >> > > >>> Guozhang > > >>> > > >>> > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson <ja...@confluent.io> > > >>> wrote: > > >>> > > >>> > For reference, we have BrokerApiVersionCommand already as a public > > >>> > interface. We have a bit of tech debt at the moment because it > uses a > > >>> > custom AdminClient. It would be nice to clean that up. In general, > I > > >>> think > > >>> > it is reasonable to expose from AdminClient. It can be used by > > >>> management > > >>> > tools to inspect running Kafka versions for example. > > >>> > > > >>> > -Jason > > >>> > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen < > > >>> reluctanthero...@gmail.com> > > >>> > wrote: > > >>> > > > >>> > > Thank you for the context Colin. The groupId was indeed a > > copy-paste > > >>> > error. > > >>> > > Our use case here for 447 is (Quoted from Guozhang): > > >>> > > ''' > > >>> > > I think if we can do something else to > > >>> > > avoid this config though, for example we can use the embedded > > >>> AdminClient > > >>> > > to send the APIVersion request upon starting up, and based on the > > >>> > returned > > >>> > > value decides whether to go to the old code path or the new > > behavior. > > >>> > > ''' > > >>> > > The benefit we get is to avoid adding a new configuration to > make a > > >>> > > decision simply base on broker version. If you have concerns with > > >>> > exposing > > >>> > > ApiVersion for client, we could > > >>> > > try to think of alternative solutions too. > > >>> > > > > >>> > > Boyang > > >>> > > > > >>> > > > > >>> > > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <cmcc...@apache.org > > > > >>> wrote: > > >>> > > > > >>> > > > kafka.api.ApiVersion is an internal class, not suitable to > > exposing > > >>> > > > through AdminClient. That class is not even accessible without > > >>> having > > >>> > > the > > >>> > > > broker jars on your CLASSPATH. > > >>> > > > > > >>> > > > Another question is, what is the groupId parameter doing in the > > >>> call? > > >>> > > The > > >>> > > > API versions are the same no matter what consumer group we use, > > >>> right? > > >>> > > > Perhaps this was a copy and paste error? > > >>> > > > > > >>> > > > This is not the first time we have discussed having a method in > > >>> > > > AdminClient to retrieve API version information. In fact, the > > >>> original > > >>> > > KIP > > >>> > > > which created KafkaAdminClient specified an API for fetching > > >>> version > > >>> > > > information. It was called apiVersions and it is still there > on > > >>> the > > >>> > > wiki. > > >>> > > > See > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations > > >>> > > > > > >>> > > > However, this API wasn't ready in time for 0.11.0 so we shipped > > >>> without > > >>> > > > it. There was a JIRA to implement it for later versions, > > >>> > > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as > a > > >>> PR, > > >>> > > > https://github.com/apache/kafka/pull/3012 . However, we > started > > >>> to > > >>> > > > rethink whether this AdminClient function was even necessary. > > >>> Most of > > >>> > > the > > >>> > > > use-cases we could think of seemed like horrible hacks. So it > > has > > >>> > never > > >>> > > > really been implemented (yet?). > > >>> > > > > > >>> > > > best, > > >>> > > > Colin > > >>> > > > > > >>> > > > > > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote: > > >>> > > > > Actually, after a second thought, I think it actually makes > > >>> sense to > > >>> > > > > support auto upgrade through admin client to help use get api > > >>> version > > >>> > > > > from > > >>> > > > > broker. > > >>> > > > > A draft KIP is here: > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client > > >>> > > > > > > >>> > > > > Boyang > > >>> > > > > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen < > > >>> > > reluctanthero...@gmail.com> > > >>> > > > > wrote: > > >>> > > > > > > >>> > > > > > Thank you Guozhang, some of my understandings are inline > > below. > > >>> > > > > > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson < > > >>> > ja...@confluent.io > > >>> > > > > > >>> > > > > > wrote: > > >>> > > > > > > > >>> > > > > >> > > > >>> > > > > >> > I think co-locating does have some merits here, i.e. > > >>> letting the > > >>> > > > > >> > ConsumerCoordinator which has the source-of-truth of > > >>> assignment > > >>> > to > > >>> > > > act > > >>> > > > > >> as > > >>> > > > > >> > the TxnCoordinator as well; but I agree there's also > some > > >>> cons > > >>> > of > > >>> > > > > >> coupling > > >>> > > > > >> > them together. I'm still a bit inclining towards > > colocation > > >>> but > > >>> > if > > >>> > > > there > > >>> > > > > >> > are good rationales not to do so I can be convinced as > > well. > > >>> > > > > >> > > >>> > > > > >> > > >>> > > > > >> The good rationale is that we have no mechanism to > colocate > > >>> > > > partitions ;). > > >>> > > > > >> Are you suggesting we store the group and transaction > state > > >>> in the > > >>> > > > same > > >>> > > > > >> log? Can you be more concrete about the benefit? > > >>> > > > > >> > > >>> > > > > >> -Jason > > >>> > > > > >> > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang < > > >>> > wangg...@gmail.com> > > >>> > > > > >> wrote: > > >>> > > > > >> > > >>> > > > > >> > Hi Boyang, > > >>> > > > > >> > > > >>> > > > > >> > 1. One advantage of retry against on-hold is that it > will > > >>> not > > >>> > > > tie-up a > > >>> > > > > >> > handler thread (of course the latter could do the same > but > > >>> that > > >>> > > > involves > > >>> > > > > >> > using a purgatory which is more complicated), and also > it > > is > > >>> > less > > >>> > > > > >> likely to > > >>> > > > > >> > violate request timeout. So I think there are some > > >>> rationales to > > >>> > > > prefer > > >>> > > > > >> > retries. > > >>> > > > > >> > > > >>> > > > > >> > > >>> > > > > > That sounds fair to me, also we are avoiding usage of > > another > > >>> > > > purgatory > > >>> > > > > > instance. Usually for one back-off > > >>> > > > > > we are only delaying 50ms during startup which is trivial > > cost. > > >>> > This > > >>> > > > > > behavior shouldn't be changed. > > >>> > > > > > > > >>> > > > > > > 2. Regarding "ConsumerRebalanceListener": both > > >>> > > > ConsumerRebalanceListener > > >>> > > > > >> > and PartitionAssignors are user-customizable modules, > and > > >>> only > > >>> > > > > >> difference > > >>> > > > > >> > is that the former is specified via code and the latter > is > > >>> > > > specified via > > >>> > > > > >> > config. > > >>> > > > > >> > > > >>> > > > > >> > Regarding Jason's proposal of ConsumerAssignment, one > > thing > > >>> to > > >>> > > note > > >>> > > > > >> though > > >>> > > > > >> > with KIP-429 the onPartitionAssigned may not be called > if > > >>> the > > >>> > > > assignment > > >>> > > > > >> > does not change, whereas onAssignment would always be > > >>> called at > > >>> > > the > > >>> > > > end > > >>> > > > > >> of > > >>> > > > > >> > sync-group response. My proposed semantics is that > > >>> > > > > >> > `RebalanceListener#onPartitionsXXX` are used for > > >>> notifications > > >>> > to > > >>> > > > user, > > >>> > > > > >> and > > >>> > > > > >> > hence if there's no changes these will not be called, > > >>> whereas > > >>> > > > > >> > `PartitionAssignor` is used for assignor logic, whose > > >>> callback > > >>> > > would > > >>> > > > > >> always > > >>> > > > > >> > be called no matter if the partitions have changed or > not. > > >>> > > > > >> > > >>> > > > > >> I think a third option is to gracefully expose generation > id > > >>> as > > >>> > part > > >>> > > > of > > >>> > > > > > consumer API, so that we don't need to > > >>> > > > > > bother overloading various callbacks. Of course, this > builds > > >>> upon > > >>> > the > > >>> > > > > > assumption that topic partitions > > >>> > > > > > will not be included in new initTransaction API. > > >>> > > > > > > > >>> > > > > > > 3. I feel it is a bit awkward to let the TxnCoordinator > > >>> keeping > > >>> > > > partition > > >>> > > > > >> > assignments since it is sort of taking over the job of > the > > >>> > > > > >> > ConsumerCoordinator, and may likely cause a split-brain > > >>> problem > > >>> > as > > >>> > > > two > > >>> > > > > >> > coordinators keep a copy of this assignment which may be > > >>> > > different. > > >>> > > > > >> > > > >>> > > > > >> > I think co-locating does have some merits here, i.e. > > >>> letting the > > >>> > > > > >> > ConsumerCoordinator which has the source-of-truth of > > >>> assignment > > >>> > to > > >>> > > > act > > >>> > > > > >> as > > >>> > > > > >> > the TxnCoordinator as well; but I agree there's also > some > > >>> cons > > >>> > of > > >>> > > > > >> coupling > > >>> > > > > >> > them together. I'm still a bit inclining towards > > colocation > > >>> but > > >>> > if > > >>> > > > there > > >>> > > > > >> > are good rationales not to do so I can be convinced as > > well. > > >>> > > > > >> > > > >>> > > > > >> > > >>> > > > > > The purpose of co-location is to let txn coordinator see > the > > >>> group > > >>> > > > > > assignment. This priority is weakened > > >>> > > > > > when we already have defense on the consumer offset fetch, > > so I > > >>> > guess > > >>> > > > it's > > >>> > > > > > not super important anymore. > > >>> > > > > > > > >>> > > > > > > > >>> > > > > >> > 4. I guess I'm preferring the philosophy of "only add > > >>> configs if > > >>> > > > > >> there's no > > >>> > > > > >> > other ways", since more and more configs would make it > > less > > >>> and > > >>> > > less > > >>> > > > > >> > intuitive out of the box to use. > > >>> > > > > >> > > > >>> > > > > >> > I think it's a valid point that checks upon starting up > > >>> does not > > >>> > > > cope > > >>> > > > > >> with > > >>> > > > > >> > brokers downgrading but even with a config, but it is > > still > > >>> hard > > >>> > > for > > >>> > > > > >> users > > >>> > > > > >> > to determine when they can be ensured the broker would > > never > > >>> > > > downgrade > > >>> > > > > >> > anymore and hence can safely switch the config. So my > > >>> feeling is > > >>> > > > that > > >>> > > > > >> this > > >>> > > > > >> > config would not be helping too much still. If we want > to > > >>> be at > > >>> > > the > > >>> > > > > >> safer > > >>> > > > > >> > side, then I'd suggest we modify the Coordinator -> > > >>> > NetworkClient > > >>> > > > > >> hierarchy > > >>> > > > > >> > to allow the NetworkClient being able to pass the > > APIVersion > > >>> > > > metadata to > > >>> > > > > >> > Coordinator, so that Coordinator can rely on that logic > to > > >>> > change > > >>> > > > its > > >>> > > > > >> > behavior dynamically. > > >>> > > > > >> > > >>> > > > > > The stream thread init could not be supported by a client > > >>> > coordinator > > >>> > > > > > behavior change on the fly, > > >>> > > > > > we are only losing possibilities after we initialized. > (main > > >>> thread > > >>> > > > gets > > >>> > > > > > exit and no thread has global picture anymore) > > >>> > > > > > If we do want to support auto version detection, admin > client > > >>> > request > > >>> > > > in > > >>> > > > > > this sense shall be easier. > > >>> > > > > > > > >>> > > > > > > > >>> > > > > >> > > > >>> > > > > >> > 5. I do not have a concrete idea about how the impact on > > >>> Connect > > >>> > > > would > > >>> > > > > >> > make, maybe Randall or Konstantine can help here? > > >>> > > > > >> > > > >>> > > > > >> > > >>> > > > > > Sounds good, let's see their thoughts. > > >>> > > > > > > > >>> > > > > > > > >>> > > > > >> > Guozhang > > >>> > > > > >> > > > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen < > > >>> > > > > >> reluctanthero...@gmail.com> > > >>> > > > > >> > wrote: > > >>> > > > > >> > > > >>> > > > > >> > > Hey Jason, > > >>> > > > > >> > > > > >>> > > > > >> > > thank you for the proposal here. Some of my thoughts > > >>> below. > > >>> > > > > >> > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson < > > >>> > > > ja...@confluent.io> > > >>> > > > > >> > > wrote: > > >>> > > > > >> > > > > >>> > > > > >> > > > Hi Boyang, > > >>> > > > > >> > > > > > >>> > > > > >> > > > Thanks for picking this up! Still reading through > the > > >>> > updates, > > >>> > > > but > > >>> > > > > >> here > > >>> > > > > >> > > are > > >>> > > > > >> > > > a couple initial comments on the APIs: > > >>> > > > > >> > > > > > >>> > > > > >> > > > 1. The `TxnProducerIdentity` class is a bit > awkward. I > > >>> think > > >>> > > we > > >>> > > > are > > >>> > > > > >> > > trying > > >>> > > > > >> > > > to encapsulate state from the current group > > assignment. > > >>> > Maybe > > >>> > > > > >> something > > >>> > > > > >> > > > like `ConsumerAssignment` would be clearer? If we > make > > >>> the > > >>> > > usage > > >>> > > > > >> > > consistent > > >>> > > > > >> > > > across the consumer and producer, then we can avoid > > >>> exposing > > >>> > > > > >> internal > > >>> > > > > >> > > state > > >>> > > > > >> > > > like the generationId. > > >>> > > > > >> > > > > > >>> > > > > >> > > > For example: > > >>> > > > > >> > > > > > >>> > > > > >> > > > // Public API > > >>> > > > > >> > > > interface ConsumerAssignment { > > >>> > > > > >> > > > Set<TopicPartition> partittions(); > > >>> > > > > >> > > > } > > >>> > > > > >> > > > > > >>> > > > > >> > > > // Not a public API > > >>> > > > > >> > > > class InternalConsumerAssignment implements > > >>> > > ConsumerAssignment { > > >>> > > > > >> > > > Set<TopicPartition> partittions; > > >>> > > > > >> > > > int generationId; > > >>> > > > > >> > > > } > > >>> > > > > >> > > > > > >>> > > > > >> > > > Then we can change the rebalance listener to > something > > >>> like > > >>> > > > this: > > >>> > > > > >> > > > onPartitionsAssigned(ConsumerAssignment assignment) > > >>> > > > > >> > > > > > >>> > > > > >> > > > And on the producer: > > >>> > > > > >> > > > void initTransactions(String groupId, > > ConsumerAssignment > > >>> > > > > >> assignment); > > >>> > > > > >> > > > > > >>> > > > > >> > > > 2. Another bit of awkwardness is the fact that we > have > > >>> to > > >>> > pass > > >>> > > > the > > >>> > > > > >> > > groupId > > >>> > > > > >> > > > through both initTransactions() and > > >>> > > sendOffsetsToTransaction(). > > >>> > > > We > > >>> > > > > >> > could > > >>> > > > > >> > > > consider a config instead. Maybe something like ` > > >>> > > > > >> > transactional.group.id > > >>> > > > > >> > > `? > > >>> > > > > >> > > > Then we could simplify the producer APIs, > potentially > > >>> even > > >>> > > > > >> deprecating > > >>> > > > > >> > > the > > >>> > > > > >> > > > current sendOffsetsToTransaction. In fact, for this > > new > > >>> > usage, > > >>> > > > the ` > > >>> > > > > >> > > > transational.id` config is not needed. It would be > > >>> nice if > > >>> > we > > >>> > > > don't > > >>> > > > > >> > have > > >>> > > > > >> > > > to > > >>> > > > > >> > > > provide it. > > >>> > > > > >> > > > > > >>> > > > > >> > > > I like the idea of combining 1 and 2. We could > > >>> definitely > > >>> > pass > > >>> > > > in a > > >>> > > > > >> > > group.id config > > >>> > > > > >> > > so that we could avoid exposing that information in a > > >>> public > > >>> > > API. > > >>> > > > The > > >>> > > > > >> > > question I have > > >>> > > > > >> > > is that whether we should name the interface > > >>> `GroupAssignment` > > >>> > > > > >> instead, > > >>> > > > > >> > so > > >>> > > > > >> > > that Connect later > > >>> > > > > >> > > could also extend on the same interface, just to echo > > >>> > Guozhang's > > >>> > > > point > > >>> > > > > >> > > here, Also the base interface > > >>> > > > > >> > > is better to be defined empty for easy extension, or > > >>> define an > > >>> > > > > >> abstract > > >>> > > > > >> > > type called `Resource` to be shareable > > >>> > > > > >> > > later IMHO. > > >>> > > > > >> > > > > >>> > > > > >> > > > > >>> > > > > >> > > > By the way, I'm a bit confused about discussion > above > > >>> about > > >>> > > > > >> colocating > > >>> > > > > >> > > the > > >>> > > > > >> > > > txn and group coordinators. That is not actually > > >>> necessary, > > >>> > is > > >>> > > > it? > > >>> > > > > >> > > > > > >>> > > > > >> > > > Yes, this is not a requirement for this KIP, because > > it > > >>> is > > >>> > > > > >> inherently > > >>> > > > > >> > > impossible to > > >>> > > > > >> > > achieve co-locating topic partition of transaction > log > > >>> and > > >>> > > > consumed > > >>> > > > > >> > offset > > >>> > > > > >> > > topics. > > >>> > > > > >> > > > > >>> > > > > >> > > > > >>> > > > > >> > > > Thanks, > > >>> > > > > >> > > > Jason > > >>> > > > > >> > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen < > > >>> > > > > >> reluctanthero...@gmail.com > > >>> > > > > >> > > > > >>> > > > > >> > > > wrote: > > >>> > > > > >> > > > > > >>> > > > > >> > > > > Thank you Ismael for the suggestion. We will > attempt > > >>> to > > >>> > > > address > > >>> > > > > >> it by > > >>> > > > > >> > > > > giving more details to rejected alternative > section. > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > Thank you for the comment Guozhang! Answers are > > inline > > >>> > > below. > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang < > > >>> > > > wangg...@gmail.com > > >>> > > > > >> > > > >>> > > > > >> > > > wrote: > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > Hello Boyang, > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > Thanks for the KIP, I have some comments below: > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > 1. "Once transactions are complete, the call > will > > >>> > return." > > >>> > > > This > > >>> > > > > >> > seems > > >>> > > > > >> > > > > > different from the existing behavior, in which > we > > >>> would > > >>> > > > return a > > >>> > > > > >> > > > > retriable > > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and let the client to > > >>> retry, is > > >>> > > this > > >>> > > > > >> > > > intentional? > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > I don’t think it is intentional, and I will defer > > this > > >>> > > > question to > > >>> > > > > >> > > Jason > > >>> > > > > >> > > > > when he got time to answer since from what I > > >>> understood > > >>> > > retry > > >>> > > > and > > >>> > > > > >> on > > >>> > > > > >> > > hold > > >>> > > > > >> > > > > seem both valid approaches. > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > 2. "an overload to onPartitionsAssigned in the > > >>> > consumer's > > >>> > > > > >> rebalance > > >>> > > > > >> > > > > > listener interface": as part of KIP-341 we've > > >>> already > > >>> > add > > >>> > > > this > > >>> > > > > >> > > > > information > > >>> > > > > >> > > > > > to the onAssignment callback. Would this be > > >>> sufficient? > > >>> > Or > > >>> > > > more > > >>> > > > > >> > > > generally > > >>> > > > > >> > > > > > speaking, which information have to be passed > > >>> around in > > >>> > > > > >> rebalance > > >>> > > > > >> > > > > callback > > >>> > > > > >> > > > > > while others can be passed around in > > >>> PartitionAssignor > > >>> > > > > >> callback? In > > >>> > > > > >> > > > > Streams > > >>> > > > > >> > > > > > for example both callbacks are used but most > > >>> critical > > >>> > > > > >> information > > >>> > > > > >> > is > > >>> > > > > >> > > > > passed > > >>> > > > > >> > > > > > via onAssignment. > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > We still need to extend ConsumerRebalanceListener > > >>> because > > >>> > > > it’s the > > >>> > > > > >> > > > > interface we could have public access to. The > > >>> > #onAssignment > > >>> > > > call > > >>> > > > > >> is > > >>> > > > > >> > > > defined > > >>> > > > > >> > > > > on PartitionAssignor level which is not easy to > work > > >>> with > > >>> > > > external > > >>> > > > > >> > > > > producers. > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > 3. "We propose to use a separate record type in > > >>> order to > > >>> > > > store > > >>> > > > > >> the > > >>> > > > > >> > > > group > > >>> > > > > >> > > > > > assignment.": hmm, I thought with the third > typed > > >>> > > > > >> FindCoordinator, > > >>> > > > > >> > > the > > >>> > > > > >> > > > > same > > >>> > > > > >> > > > > > broker that act as the consumer coordinator > would > > >>> > always > > >>> > > be > > >>> > > > > >> > selected > > >>> > > > > >> > > > as > > >>> > > > > >> > > > > > the txn coordinator, in which case it can access > > its > > >>> > local > > >>> > > > cache > > >>> > > > > >> > > > > metadata / > > >>> > > > > >> > > > > > offset topic to get this information already? We > > >>> just > > >>> > need > > >>> > > > to > > >>> > > > > >> think > > >>> > > > > >> > > > about > > >>> > > > > >> > > > > > how to make these two modules directly exchange > > >>> > > information > > >>> > > > > >> without > > >>> > > > > >> > > > > messing > > >>> > > > > >> > > > > > up the code hierarchy. > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > These two coordinators will be on the same broker > > only > > >>> > when > > >>> > > > > >> number of > > >>> > > > > >> > > > > partitions for transaction state topic and > consumer > > >>> offset > > >>> > > > topic > > >>> > > > > >> are > > >>> > > > > >> > > the > > >>> > > > > >> > > > > same. This normally holds true, but I'm afraid > > >>> > > > > >> > > > > we couldn't make this assumption? > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > 4. The config of > "CONSUMER_GROUP_AWARE_TRANSACTION": > > >>> it > > >>> > > seems > > >>> > > > the > > >>> > > > > >> > goal > > >>> > > > > >> > > of > > >>> > > > > >> > > > > > this config is just to avoid old-versioned > broker > > >>> to not > > >>> > > be > > >>> > > > > >> able to > > >>> > > > > >> > > > > > recognize newer versioned client. I think if we > > can > > >>> do > > >>> > > > something > > >>> > > > > >> > else > > >>> > > > > >> > > > to > > >>> > > > > >> > > > > > avoid this config though, for example we can use > > the > > >>> > > > embedded > > >>> > > > > >> > > > AdminClient > > >>> > > > > >> > > > > > to send the APIVersion request upon starting up, > > and > > >>> > based > > >>> > > > on > > >>> > > > > >> the > > >>> > > > > >> > > > > returned > > >>> > > > > >> > > > > > value decides whether to go to the old code path > > or > > >>> the > > >>> > > new > > >>> > > > > >> > behavior. > > >>> > > > > >> > > > > > Admittedly asking a random broker about > APIVersion > > >>> does > > >>> > > not > > >>> > > > > >> > guarantee > > >>> > > > > >> > > > the > > >>> > > > > >> > > > > > whole cluster's versions, but what we can do is > to > > >>> first > > >>> > > 1) > > >>> > > > find > > >>> > > > > >> > the > > >>> > > > > >> > > > > > coordinator (and if the random broker does not > > even > > >>> > > > recognize > > >>> > > > > >> the > > >>> > > > > >> > new > > >>> > > > > >> > > > > > discover type, fall back to old path directly), > > and > > >>> then > > >>> > > 2) > > >>> > > > ask > > >>> > > > > >> the > > >>> > > > > >> > > > > > discovered coordinator about its supported > > >>> APIVersion. > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > The caveat here is that we have to make sure both > > the > > >>> > group > > >>> > > > > >> > coordinator > > >>> > > > > >> > > > and > > >>> > > > > >> > > > > transaction coordinator are on the latest version > > >>> during > > >>> > > init > > >>> > > > > >> stage. > > >>> > > > > >> > > This > > >>> > > > > >> > > > > is potentially doable as we only need a consumer > > >>> group.id > > >>> > > > > >> > > > > to check that. In the meantime, a hard-coded > config > > is > > >>> > > still a > > >>> > > > > >> > > favorable > > >>> > > > > >> > > > > backup in case the server has downgraded, so you > > will > > >>> want > > >>> > > to > > >>> > > > use > > >>> > > > > >> a > > >>> > > > > >> > new > > >>> > > > > >> > > > > version client without `consumer group` > > transactional > > >>> > > support. > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > 5. This is a meta question: have you considered > how > > >>> this > > >>> > can > > >>> > > > be > > >>> > > > > >> > applied > > >>> > > > > >> > > > to > > >>> > > > > >> > > > > > Kafka Connect as well? For example, for source > > >>> > connectors, > > >>> > > > the > > >>> > > > > >> > > > assignment > > >>> > > > > >> > > > > > is not by "partitions", but by some other sort > of > > >>> > > > "resources" > > >>> > > > > >> based > > >>> > > > > >> > > on > > >>> > > > > >> > > > > the > > >>> > > > > >> > > > > > source systems, how KIP-447 would affect Kafka > > >>> > Connectors > > >>> > > > that > > >>> > > > > >> > > > > implemented > > >>> > > > > >> > > > > > EOS as well? > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > No, it's not currently included in the scope. > Could > > >>> you > > >>> > > point > > >>> > > > me > > >>> > > > > >> to a > > >>> > > > > >> > > > > sample source connector who uses EOS? Could always > > >>> > > piggy-back > > >>> > > > into > > >>> > > > > >> > the > > >>> > > > > >> > > > > TxnProducerIdentity struct with more information > > such > > >>> as > > >>> > > > tasks. If > > >>> > > > > >> > > > > this is something to support in near term, an > > abstract > > >>> > type > > >>> > > > called > > >>> > > > > >> > > > > "Resource" could be provided and let topic > partition > > >>> and > > >>> > > > connect > > >>> > > > > >> task > > >>> > > > > >> > > > > implement it. > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > Guozhang > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma < > > >>> > > > ism...@juma.me.uk> > > >>> > > > > >> > > wrote: > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > Hi Boyang, > > >>> > > > > >> > > > > > > > > >>> > > > > >> > > > > > > Thanks for the KIP. It's good that we listed a > > >>> number > > >>> > of > > >>> > > > > >> rejected > > >>> > > > > >> > > > > > > alternatives. It would be helpful to have an > > >>> > explanation > > >>> > > > of > > >>> > > > > >> why > > >>> > > > > >> > > they > > >>> > > > > >> > > > > were > > >>> > > > > >> > > > > > > rejected. > > >>> > > > > >> > > > > > > > > >>> > > > > >> > > > > > > Ismael > > >>> > > > > >> > > > > > > > > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen < > > >>> > > > > >> bche...@outlook.com > > >>> > > > > >> > > > > >>> > > > > >> > > > > wrote: > > >>> > > > > >> > > > > > > > > >>> > > > > >> > > > > > > > Hey all, > > >>> > > > > >> > > > > > > > > > >>> > > > > >> > > > > > > > I would like to start a discussion for > > KIP-447: > > >>> > > > > >> > > > > > > > > > >>> > > > > >> > > > > > > > > > >>> > > > > >> > > > > > > > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > >>> > > > > >> > > > > >>> > > > > >> > > > >>> > > > > >> > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > >>> > > > > >> > > > > > > > > > >>> > > > > >> > > > > > > > this is a work originated by Jason Gustafson > > >>> and we > > >>> > > > would > > >>> > > > > >> like > > >>> > > > > >> > to > > >>> > > > > >> > > > > > proceed > > >>> > > > > >> > > > > > > > into discussion stage. > > >>> > > > > >> > > > > > > > > > >>> > > > > >> > > > > > > > Let me know your thoughts, thanks! > > >>> > > > > >> > > > > > > > > > >>> > > > > >> > > > > > > > Boyang > > >>> > > > > >> > > > > > > > > > >>> > > > > >> > > > > > > > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > -- > > >>> > > > > >> > > > > > -- Guozhang > > >>> > > > > >> > > > > > > > >>> > > > > >> > > > > > > >>> > > > > >> > > > > > >>> > > > > >> > > > > >>> > > > > >> > > > >>> > > > > >> > > > >>> > > > > >> > -- > > >>> > > > > >> > -- Guozhang > > >>> > > > > >> > > > >>> > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > > >>> -- > > >>> -- Guozhang > > >>> > > >> > > >