Thank you Guozhang for the questions, inline answers are below. On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <reluctanthero...@gmail.com> wrote:
> Hey all, > > I have done a fundamental polish of KIP-447 > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics> > and > written a design doc > <https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#> > depicting > internal changes. We stripped off many implementation details from the KIP, > and simplified the public changes by a lot. For reviewers, it is highly > recommended to fully understand EOS design in KIP-98 and read its > corresponding design doc > <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit> > if > you haven't done so already. > > Let me know if you found anything confusing around the KIP or the design. > Would be happy to discuss in depth. > > Best, > Boyang > > > > > > On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <wangg...@gmail.com> wrote: > >> 2. The reason we did not expose generation.id from KafkaConsumer public >> APIs directly is to abstract this notion from users (since it is an >> implementation detail of the rebalance protocol itself, e.g. if user calls >> consumer.assign() they do not need to invoke ConsumerCoordinator and no >> need to be aware of generation.id at all). >> >> On the other hand, with the current proposal the txn.coordiantor did not >> know about the latest generation from the source-of-truth >> group.coordinator; instead, it will only bump up the generation from the >> producer's InitProducerIdRequest only. >> >> The key here is that GroupCoordinator, when handling >> `InitProducerIdRequest >> > In the new design, we just pass the entire consumer instance into the producer through #initTransaction, so no public API will be created. > 3. I agree that if we rely on the group coordinator to block on returning >> offset-fetch-response if read-committed is enabled, then we do not need to >> store partition assignment on txn coordinator and therefore it's better to >> still decouple them. For that case we still need to update the KIP wiki >> page that includes: >> >> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well. >> 3.b. Add new error code in OffsetFetchResponse to let client backoff and >> retry if there are pending txns including the interested partitions. >> 3.c. Also in the worst case we would let the client be blocked for the >> txn.timeout period, and for that rationale we may need to consider >> reducing >> our default txn.timeout value as well. >> >> Addressed 3.b and 3.c, will do 3.a. > 4. According to Colin it seems we do not need to create another KIP and we >> can just complete it as part of KIP-117 / KAFKA-5214; and we need to do >> some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin >> please let use know if you have any concerns exposing it). >> > I think we no longer need to rely on api version for initialization, since we will be using the upgrade.from config anyway. > >> Guozhang >> >> >> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson <ja...@confluent.io> >> wrote: >> >> > For reference, we have BrokerApiVersionCommand already as a public >> > interface. We have a bit of tech debt at the moment because it uses a >> > custom AdminClient. It would be nice to clean that up. In general, I >> think >> > it is reasonable to expose from AdminClient. It can be used by >> management >> > tools to inspect running Kafka versions for example. >> > >> > -Jason >> > >> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <reluctanthero...@gmail.com >> > >> > wrote: >> > >> > > Thank you for the context Colin. The groupId was indeed a copy-paste >> > error. >> > > Our use case here for 447 is (Quoted from Guozhang): >> > > ''' >> > > I think if we can do something else to >> > > avoid this config though, for example we can use the embedded >> AdminClient >> > > to send the APIVersion request upon starting up, and based on the >> > returned >> > > value decides whether to go to the old code path or the new behavior. >> > > ''' >> > > The benefit we get is to avoid adding a new configuration to make a >> > > decision simply base on broker version. If you have concerns with >> > exposing >> > > ApiVersion for client, we could >> > > try to think of alternative solutions too. >> > > >> > > Boyang >> > > >> > > >> > > >> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <cmcc...@apache.org> >> wrote: >> > > >> > > > kafka.api.ApiVersion is an internal class, not suitable to exposing >> > > > through AdminClient. That class is not even accessible without >> having >> > > the >> > > > broker jars on your CLASSPATH. >> > > > >> > > > Another question is, what is the groupId parameter doing in the >> call? >> > > The >> > > > API versions are the same no matter what consumer group we use, >> right? >> > > > Perhaps this was a copy and paste error? >> > > > >> > > > This is not the first time we have discussed having a method in >> > > > AdminClient to retrieve API version information. In fact, the >> original >> > > KIP >> > > > which created KafkaAdminClient specified an API for fetching version >> > > > information. It was called apiVersions and it is still there on the >> > > wiki. >> > > > See >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations >> > > > >> > > > However, this API wasn't ready in time for 0.11.0 so we shipped >> without >> > > > it. There was a JIRA to implement it for later versions, >> > > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR, >> > > > https://github.com/apache/kafka/pull/3012 . However, we started to >> > > > rethink whether this AdminClient function was even necessary. Most >> of >> > > the >> > > > use-cases we could think of seemed like horrible hacks. So it has >> > never >> > > > really been implemented (yet?). >> > > > >> > > > best, >> > > > Colin >> > > > >> > > > >> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote: >> > > > > Actually, after a second thought, I think it actually makes sense >> to >> > > > > support auto upgrade through admin client to help use get api >> version >> > > > > from >> > > > > broker. >> > > > > A draft KIP is here: >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client >> > > > > >> > > > > Boyang >> > > > > >> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen < >> > > reluctanthero...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > Thank you Guozhang, some of my understandings are inline below. >> > > > > > >> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson < >> > ja...@confluent.io >> > > > >> > > > > > wrote: >> > > > > > >> > > > > >> > >> > > > > >> > I think co-locating does have some merits here, i.e. letting >> the >> > > > > >> > ConsumerCoordinator which has the source-of-truth of >> assignment >> > to >> > > > act >> > > > > >> as >> > > > > >> > the TxnCoordinator as well; but I agree there's also some >> cons >> > of >> > > > > >> coupling >> > > > > >> > them together. I'm still a bit inclining towards colocation >> but >> > if >> > > > there >> > > > > >> > are good rationales not to do so I can be convinced as well. >> > > > > >> >> > > > > >> >> > > > > >> The good rationale is that we have no mechanism to colocate >> > > > partitions ;). >> > > > > >> Are you suggesting we store the group and transaction state in >> the >> > > > same >> > > > > >> log? Can you be more concrete about the benefit? >> > > > > >> >> > > > > >> -Jason >> > > > > >> >> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang < >> > wangg...@gmail.com> >> > > > > >> wrote: >> > > > > >> >> > > > > >> > Hi Boyang, >> > > > > >> > >> > > > > >> > 1. One advantage of retry against on-hold is that it will not >> > > > tie-up a >> > > > > >> > handler thread (of course the latter could do the same but >> that >> > > > involves >> > > > > >> > using a purgatory which is more complicated), and also it is >> > less >> > > > > >> likely to >> > > > > >> > violate request timeout. So I think there are some >> rationales to >> > > > prefer >> > > > > >> > retries. >> > > > > >> > >> > > > > >> >> > > > > > That sounds fair to me, also we are avoiding usage of another >> > > > purgatory >> > > > > > instance. Usually for one back-off >> > > > > > we are only delaying 50ms during startup which is trivial cost. >> > This >> > > > > > behavior shouldn't be changed. >> > > > > > >> > > > > > > 2. Regarding "ConsumerRebalanceListener": both >> > > > ConsumerRebalanceListener >> > > > > >> > and PartitionAssignors are user-customizable modules, and >> only >> > > > > >> difference >> > > > > >> > is that the former is specified via code and the latter is >> > > > specified via >> > > > > >> > config. >> > > > > >> > >> > > > > >> > Regarding Jason's proposal of ConsumerAssignment, one thing >> to >> > > note >> > > > > >> though >> > > > > >> > with KIP-429 the onPartitionAssigned may not be called if the >> > > > assignment >> > > > > >> > does not change, whereas onAssignment would always be called >> at >> > > the >> > > > end >> > > > > >> of >> > > > > >> > sync-group response. My proposed semantics is that >> > > > > >> > `RebalanceListener#onPartitionsXXX` are used for >> notifications >> > to >> > > > user, >> > > > > >> and >> > > > > >> > hence if there's no changes these will not be called, whereas >> > > > > >> > `PartitionAssignor` is used for assignor logic, whose >> callback >> > > would >> > > > > >> always >> > > > > >> > be called no matter if the partitions have changed or not. >> > > > > >> >> > > > > >> I think a third option is to gracefully expose generation id as >> > part >> > > > of >> > > > > > consumer API, so that we don't need to >> > > > > > bother overloading various callbacks. Of course, this builds >> upon >> > the >> > > > > > assumption that topic partitions >> > > > > > will not be included in new initTransaction API. >> > > > > > >> > > > > > > 3. I feel it is a bit awkward to let the TxnCoordinator >> keeping >> > > > partition >> > > > > >> > assignments since it is sort of taking over the job of the >> > > > > >> > ConsumerCoordinator, and may likely cause a split-brain >> problem >> > as >> > > > two >> > > > > >> > coordinators keep a copy of this assignment which may be >> > > different. >> > > > > >> > >> > > > > >> > I think co-locating does have some merits here, i.e. letting >> the >> > > > > >> > ConsumerCoordinator which has the source-of-truth of >> assignment >> > to >> > > > act >> > > > > >> as >> > > > > >> > the TxnCoordinator as well; but I agree there's also some >> cons >> > of >> > > > > >> coupling >> > > > > >> > them together. I'm still a bit inclining towards colocation >> but >> > if >> > > > there >> > > > > >> > are good rationales not to do so I can be convinced as well. >> > > > > >> > >> > > > > >> >> > > > > > The purpose of co-location is to let txn coordinator see the >> group >> > > > > > assignment. This priority is weakened >> > > > > > when we already have defense on the consumer offset fetch, so I >> > guess >> > > > it's >> > > > > > not super important anymore. >> > > > > > >> > > > > > >> > > > > >> > 4. I guess I'm preferring the philosophy of "only add >> configs if >> > > > > >> there's no >> > > > > >> > other ways", since more and more configs would make it less >> and >> > > less >> > > > > >> > intuitive out of the box to use. >> > > > > >> > >> > > > > >> > I think it's a valid point that checks upon starting up does >> not >> > > > cope >> > > > > >> with >> > > > > >> > brokers downgrading but even with a config, but it is still >> hard >> > > for >> > > > > >> users >> > > > > >> > to determine when they can be ensured the broker would never >> > > > downgrade >> > > > > >> > anymore and hence can safely switch the config. So my >> feeling is >> > > > that >> > > > > >> this >> > > > > >> > config would not be helping too much still. If we want to be >> at >> > > the >> > > > > >> safer >> > > > > >> > side, then I'd suggest we modify the Coordinator -> >> > NetworkClient >> > > > > >> hierarchy >> > > > > >> > to allow the NetworkClient being able to pass the APIVersion >> > > > metadata to >> > > > > >> > Coordinator, so that Coordinator can rely on that logic to >> > change >> > > > its >> > > > > >> > behavior dynamically. >> > > > > >> >> > > > > > The stream thread init could not be supported by a client >> > coordinator >> > > > > > behavior change on the fly, >> > > > > > we are only losing possibilities after we initialized. (main >> thread >> > > > gets >> > > > > > exit and no thread has global picture anymore) >> > > > > > If we do want to support auto version detection, admin client >> > request >> > > > in >> > > > > > this sense shall be easier. >> > > > > > >> > > > > > >> > > > > >> > >> > > > > >> > 5. I do not have a concrete idea about how the impact on >> Connect >> > > > would >> > > > > >> > make, maybe Randall or Konstantine can help here? >> > > > > >> > >> > > > > >> >> > > > > > Sounds good, let's see their thoughts. >> > > > > > >> > > > > > >> > > > > >> > Guozhang >> > > > > >> > >> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen < >> > > > > >> reluctanthero...@gmail.com> >> > > > > >> > wrote: >> > > > > >> > >> > > > > >> > > Hey Jason, >> > > > > >> > > >> > > > > >> > > thank you for the proposal here. Some of my thoughts below. >> > > > > >> > > >> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson < >> > > > ja...@confluent.io> >> > > > > >> > > wrote: >> > > > > >> > > >> > > > > >> > > > Hi Boyang, >> > > > > >> > > > >> > > > > >> > > > Thanks for picking this up! Still reading through the >> > updates, >> > > > but >> > > > > >> here >> > > > > >> > > are >> > > > > >> > > > a couple initial comments on the APIs: >> > > > > >> > > > >> > > > > >> > > > 1. The `TxnProducerIdentity` class is a bit awkward. I >> think >> > > we >> > > > are >> > > > > >> > > trying >> > > > > >> > > > to encapsulate state from the current group assignment. >> > Maybe >> > > > > >> something >> > > > > >> > > > like `ConsumerAssignment` would be clearer? If we make >> the >> > > usage >> > > > > >> > > consistent >> > > > > >> > > > across the consumer and producer, then we can avoid >> exposing >> > > > > >> internal >> > > > > >> > > state >> > > > > >> > > > like the generationId. >> > > > > >> > > > >> > > > > >> > > > For example: >> > > > > >> > > > >> > > > > >> > > > // Public API >> > > > > >> > > > interface ConsumerAssignment { >> > > > > >> > > > Set<TopicPartition> partittions(); >> > > > > >> > > > } >> > > > > >> > > > >> > > > > >> > > > // Not a public API >> > > > > >> > > > class InternalConsumerAssignment implements >> > > ConsumerAssignment { >> > > > > >> > > > Set<TopicPartition> partittions; >> > > > > >> > > > int generationId; >> > > > > >> > > > } >> > > > > >> > > > >> > > > > >> > > > Then we can change the rebalance listener to something >> like >> > > > this: >> > > > > >> > > > onPartitionsAssigned(ConsumerAssignment assignment) >> > > > > >> > > > >> > > > > >> > > > And on the producer: >> > > > > >> > > > void initTransactions(String groupId, ConsumerAssignment >> > > > > >> assignment); >> > > > > >> > > > >> > > > > >> > > > 2. Another bit of awkwardness is the fact that we have to >> > pass >> > > > the >> > > > > >> > > groupId >> > > > > >> > > > through both initTransactions() and >> > > sendOffsetsToTransaction(). >> > > > We >> > > > > >> > could >> > > > > >> > > > consider a config instead. Maybe something like ` >> > > > > >> > transactional.group.id >> > > > > >> > > `? >> > > > > >> > > > Then we could simplify the producer APIs, potentially >> even >> > > > > >> deprecating >> > > > > >> > > the >> > > > > >> > > > current sendOffsetsToTransaction. In fact, for this new >> > usage, >> > > > the ` >> > > > > >> > > > transational.id` config is not needed. It would be nice >> if >> > we >> > > > don't >> > > > > >> > have >> > > > > >> > > > to >> > > > > >> > > > provide it. >> > > > > >> > > > >> > > > > >> > > > I like the idea of combining 1 and 2. We could definitely >> > pass >> > > > in a >> > > > > >> > > group.id config >> > > > > >> > > so that we could avoid exposing that information in a >> public >> > > API. >> > > > The >> > > > > >> > > question I have >> > > > > >> > > is that whether we should name the interface >> `GroupAssignment` >> > > > > >> instead, >> > > > > >> > so >> > > > > >> > > that Connect later >> > > > > >> > > could also extend on the same interface, just to echo >> > Guozhang's >> > > > point >> > > > > >> > > here, Also the base interface >> > > > > >> > > is better to be defined empty for easy extension, or >> define an >> > > > > >> abstract >> > > > > >> > > type called `Resource` to be shareable >> > > > > >> > > later IMHO. >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > > By the way, I'm a bit confused about discussion above >> about >> > > > > >> colocating >> > > > > >> > > the >> > > > > >> > > > txn and group coordinators. That is not actually >> necessary, >> > is >> > > > it? >> > > > > >> > > > >> > > > > >> > > > Yes, this is not a requirement for this KIP, because it >> is >> > > > > >> inherently >> > > > > >> > > impossible to >> > > > > >> > > achieve co-locating topic partition of transaction log and >> > > > consumed >> > > > > >> > offset >> > > > > >> > > topics. >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > > Thanks, >> > > > > >> > > > Jason >> > > > > >> > > > >> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen < >> > > > > >> reluctanthero...@gmail.com >> > > > > >> > > >> > > > > >> > > > wrote: >> > > > > >> > > > >> > > > > >> > > > > Thank you Ismael for the suggestion. We will attempt to >> > > > address >> > > > > >> it by >> > > > > >> > > > > giving more details to rejected alternative section. >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > Thank you for the comment Guozhang! Answers are inline >> > > below. >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang < >> > > > wangg...@gmail.com >> > > > > >> > >> > > > > >> > > > wrote: >> > > > > >> > > > > >> > > > > >> > > > > > Hello Boyang, >> > > > > >> > > > > > >> > > > > >> > > > > > Thanks for the KIP, I have some comments below: >> > > > > >> > > > > > >> > > > > >> > > > > > 1. "Once transactions are complete, the call will >> > return." >> > > > This >> > > > > >> > seems >> > > > > >> > > > > > different from the existing behavior, in which we >> would >> > > > return a >> > > > > >> > > > > retriable >> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and let the client to retry, >> is >> > > this >> > > > > >> > > > intentional? >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > I don’t think it is intentional, and I will defer this >> > > > question to >> > > > > >> > > Jason >> > > > > >> > > > > when he got time to answer since from what I understood >> > > retry >> > > > and >> > > > > >> on >> > > > > >> > > hold >> > > > > >> > > > > seem both valid approaches. >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > > 2. "an overload to onPartitionsAssigned in the >> > consumer's >> > > > > >> rebalance >> > > > > >> > > > > > listener interface": as part of KIP-341 we've already >> > add >> > > > this >> > > > > >> > > > > information >> > > > > >> > > > > > to the onAssignment callback. Would this be >> sufficient? >> > Or >> > > > more >> > > > > >> > > > generally >> > > > > >> > > > > > speaking, which information have to be passed around >> in >> > > > > >> rebalance >> > > > > >> > > > > callback >> > > > > >> > > > > > while others can be passed around in >> PartitionAssignor >> > > > > >> callback? In >> > > > > >> > > > > Streams >> > > > > >> > > > > > for example both callbacks are used but most critical >> > > > > >> information >> > > > > >> > is >> > > > > >> > > > > passed >> > > > > >> > > > > > via onAssignment. >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > We still need to extend ConsumerRebalanceListener >> because >> > > > it’s the >> > > > > >> > > > > interface we could have public access to. The >> > #onAssignment >> > > > call >> > > > > >> is >> > > > > >> > > > defined >> > > > > >> > > > > on PartitionAssignor level which is not easy to work >> with >> > > > external >> > > > > >> > > > > producers. >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > > 3. "We propose to use a separate record type in >> order to >> > > > store >> > > > > >> the >> > > > > >> > > > group >> > > > > >> > > > > > assignment.": hmm, I thought with the third typed >> > > > > >> FindCoordinator, >> > > > > >> > > the >> > > > > >> > > > > same >> > > > > >> > > > > > broker that act as the consumer coordinator would >> > always >> > > be >> > > > > >> > selected >> > > > > >> > > > as >> > > > > >> > > > > > the txn coordinator, in which case it can access its >> > local >> > > > cache >> > > > > >> > > > > metadata / >> > > > > >> > > > > > offset topic to get this information already? We just >> > need >> > > > to >> > > > > >> think >> > > > > >> > > > about >> > > > > >> > > > > > how to make these two modules directly exchange >> > > information >> > > > > >> without >> > > > > >> > > > > messing >> > > > > >> > > > > > up the code hierarchy. >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > These two coordinators will be on the same broker only >> > when >> > > > > >> number of >> > > > > >> > > > > partitions for transaction state topic and consumer >> offset >> > > > topic >> > > > > >> are >> > > > > >> > > the >> > > > > >> > > > > same. This normally holds true, but I'm afraid >> > > > > >> > > > > we couldn't make this assumption? >> > > > > >> > > > > >> > > > > >> > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it >> > > seems >> > > > the >> > > > > >> > goal >> > > > > >> > > of >> > > > > >> > > > > > this config is just to avoid old-versioned broker to >> not >> > > be >> > > > > >> able to >> > > > > >> > > > > > recognize newer versioned client. I think if we can >> do >> > > > something >> > > > > >> > else >> > > > > >> > > > to >> > > > > >> > > > > > avoid this config though, for example we can use the >> > > > embedded >> > > > > >> > > > AdminClient >> > > > > >> > > > > > to send the APIVersion request upon starting up, and >> > based >> > > > on >> > > > > >> the >> > > > > >> > > > > returned >> > > > > >> > > > > > value decides whether to go to the old code path or >> the >> > > new >> > > > > >> > behavior. >> > > > > >> > > > > > Admittedly asking a random broker about APIVersion >> does >> > > not >> > > > > >> > guarantee >> > > > > >> > > > the >> > > > > >> > > > > > whole cluster's versions, but what we can do is to >> first >> > > 1) >> > > > find >> > > > > >> > the >> > > > > >> > > > > > coordinator (and if the random broker does not even >> > > > recognize >> > > > > >> the >> > > > > >> > new >> > > > > >> > > > > > discover type, fall back to old path directly), and >> then >> > > 2) >> > > > ask >> > > > > >> the >> > > > > >> > > > > > discovered coordinator about its supported >> APIVersion. >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > The caveat here is that we have to make sure both the >> > group >> > > > > >> > coordinator >> > > > > >> > > > and >> > > > > >> > > > > transaction coordinator are on the latest version >> during >> > > init >> > > > > >> stage. >> > > > > >> > > This >> > > > > >> > > > > is potentially doable as we only need a consumer >> group.id >> > > > > >> > > > > to check that. In the meantime, a hard-coded config is >> > > still a >> > > > > >> > > favorable >> > > > > >> > > > > backup in case the server has downgraded, so you will >> want >> > > to >> > > > use >> > > > > >> a >> > > > > >> > new >> > > > > >> > > > > version client without `consumer group` transactional >> > > support. >> > > > > >> > > > > >> > > > > >> > > > > 5. This is a meta question: have you considered how >> this >> > can >> > > > be >> > > > > >> > applied >> > > > > >> > > > to >> > > > > >> > > > > > Kafka Connect as well? For example, for source >> > connectors, >> > > > the >> > > > > >> > > > assignment >> > > > > >> > > > > > is not by "partitions", but by some other sort of >> > > > "resources" >> > > > > >> based >> > > > > >> > > on >> > > > > >> > > > > the >> > > > > >> > > > > > source systems, how KIP-447 would affect Kafka >> > Connectors >> > > > that >> > > > > >> > > > > implemented >> > > > > >> > > > > > EOS as well? >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > No, it's not currently included in the scope. Could you >> > > point >> > > > me >> > > > > >> to a >> > > > > >> > > > > sample source connector who uses EOS? Could always >> > > piggy-back >> > > > into >> > > > > >> > the >> > > > > >> > > > > TxnProducerIdentity struct with more information such >> as >> > > > tasks. If >> > > > > >> > > > > this is something to support in near term, an abstract >> > type >> > > > called >> > > > > >> > > > > "Resource" could be provided and let topic partition >> and >> > > > connect >> > > > > >> task >> > > > > >> > > > > implement it. >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > Guozhang >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma < >> > > > ism...@juma.me.uk> >> > > > > >> > > wrote: >> > > > > >> > > > > > >> > > > > >> > > > > > > Hi Boyang, >> > > > > >> > > > > > > >> > > > > >> > > > > > > Thanks for the KIP. It's good that we listed a >> number >> > of >> > > > > >> rejected >> > > > > >> > > > > > > alternatives. It would be helpful to have an >> > explanation >> > > > of >> > > > > >> why >> > > > > >> > > they >> > > > > >> > > > > were >> > > > > >> > > > > > > rejected. >> > > > > >> > > > > > > >> > > > > >> > > > > > > Ismael >> > > > > >> > > > > > > >> > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen < >> > > > > >> bche...@outlook.com >> > > > > >> > > >> > > > > >> > > > > wrote: >> > > > > >> > > > > > > >> > > > > >> > > > > > > > Hey all, >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > I would like to start a discussion for KIP-447: >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > this is a work originated by Jason Gustafson and >> we >> > > > would >> > > > > >> like >> > > > > >> > to >> > > > > >> > > > > > proceed >> > > > > >> > > > > > > > into discussion stage. >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > Let me know your thoughts, thanks! >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > Boyang >> > > > > >> > > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > -- >> > > > > >> > > > > > -- Guozhang >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > >> > >> > > > > >> > -- >> > > > > >> > -- Guozhang >> > > > > >> > >> > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >> -- >> -- Guozhang >> >