Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-09-04 Thread Boyang Chen
>From offline discussion, the eventual conclusion is to use a top-level Consumer#getMetadata() API to fetch the latest group metadata information for offset fencing, so that we only call initTransaction once in lifetime. Since no further question is raised on this thread, I will start vote today.

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-25 Thread Boyang Chen
Hey Guozhang and Jason, I'm ok with either way. Thinking of Guozhang's approach, it is simpler to implement a consumer-producer if we avoid callback pattern and only do the group metadata initialization once, however the access pattern of consumer rebalance state is scattered, which means we get b

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-14 Thread Guozhang Wang
My main concern is to require the overloaded `initTransactions` to be called repeatedly while the original `initTransactions` still called once throughout the life time, which is a bit confusing. Looking into the current POC PR, we actually only need the latest generation id when fetching offsets,

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-14 Thread Jason Gustafson
Yeah, my reasoning is that the group metadata is only relevant to the subscription API. So it makes sense to only expose it to the rebalance listener. One option we could consider is bring back the `initTransactions` overload. Then usage looks something like this: consumer.subscribe(topics, new R

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-13 Thread Boyang Chen
Hey Guozhang, thanks for the suggestion. Could you elaborate more on why defining a direct consumer API would be easier? The benefit of reusing consumer rebalance listener is to consolidate the entry point of consumer internal states. Compared with letting consumer generate a deep-copy of metadata

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-13 Thread Guozhang Wang
Hi Boyang, Jason, If we are going to expose the generation id / group.instance id etc anyways I think its slightly better to just add a new API on KafkaConsumer returning the ConsumerGroupMetadata (option 3) than passing it in on an additional callback of ConsumerRebalanceListener. It feels easier

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-12 Thread Boyang Chen
Thanks Jason, the intuition behind defining a separate callback function is that, with KIP-429 we no longer guarantee to call OnPartitionsAssigned() or OnPartitionsRevoked() with each rebalance. Our requirement is to be up-to-date with group metadata such as generation information, so callback like

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-12 Thread Jason Gustafson
Hey Boyang, I favor option 4 as well. It's a little more cumbersome than 3 for this use case, but it seems like a cleaner separation of concerns. The rebalance listener is already concerned with events affecting the assignment lifecycle and group membership. I think the only thing I'm wondering is

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-08 Thread Boyang Chen
Thank you Jason. We had some offline discussion on properly keeping group metadata up to date, and here are some of our options brainstormed: 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)` maintain the ever-changing group metadata. This could be done on stream side, but for non-s

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-07 Thread Jason Gustafson
Hi Boyang, > We already persist member.id, instance.id and generation.id in the offset topic, what extra fields we need to store? Yeah, you're right. I was a little confused and thought this information was needed by the transaction coordinator. > This should be easily done on the stream side as

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-06 Thread Boyang Chen
Thank you for the suggestions Jason. And a side note for Guozhang, I updated the KIP to reflect the dependency on 447. On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson wrote: > Hi Boyang, thanks for the updates. I have a few more comments: > > 1. We are adding some new fields to TxnOffsetCommit t

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-06 Thread Jason Gustafson
Hi Boyang, thanks for the updates. I have a few more comments: 1. We are adding some new fields to TxnOffsetCommit to support group-based fencing. Do we need these fields to be persisted in the offsets topic to ensure that the fencing still works after a coordinator failover? 2. Since you are pro

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-05 Thread Boyang Chen
Yep, Guozhang I think that would be best as passing in an entire consumer instance is indeed cumbersome. Just saw you updated KIP-429, I will follow-up to change 447 as well. Best, Boyang On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang wrote: > okay I think I understand your concerns about Consum

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-05 Thread Guozhang Wang
okay I think I understand your concerns about ConsumerGroupMetadata now: if we still want to only call initTxns once, then we should allow the whatever passed-in parameter to reflect the latest value of generation id whenever sending the offset fetch request. Whereas the current ConsumerGroupMetad

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-05 Thread Boyang Chen
Thank you Guozhang for the reply! I'm curious whether KIP-429 has reflected the latest change on ConsumerGroupMetadata? Also regarding question one, the group metadata needs to be accessed via callback, does that mean we need a separate producer API such like "producer.refreshMetadata(groupMetadata

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-02 Thread Guozhang Wang
Thanks Boyang, I've made another pass on KIP-447 as well as https://github.com/apache/kafka/pull/7078, and have some minor comments about the proposed API: 1. it seems instead of needing the whole KafkaConsumer object, you'd only need the "ConsumerGroupMetadata", in that case can we just pass in

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-30 Thread Boyang Chen
Thank you Guozhang for the reply. We will consider the interface change from 429 as a backup plan for 447. And bumping this thread for more discussion. On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang wrote: > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen > wrote: > > > Thank you Guozhang for the s

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-22 Thread Guozhang Wang
On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen wrote: > Thank you Guozhang for the suggestion! I would normally prefer naming a > flag corresponding to its functionality. Seems to me `isolation_level` > makes us another hop on information track. > > Fair enough, let's use a separate flag name then :

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-20 Thread Boyang Chen
Thank you Guozhang for the suggestion! I would normally prefer naming a flag corresponding to its functionality. Seems to me `isolation_level` makes us another hop on information track. As for the generation.id exposure, I'm fine leveraging the new API from 429, but however is that design finalize

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-19 Thread Guozhang Wang
Boyang, thanks for the updated proposal! 3.a. As Jason mentioned, with EOS enabled we still need to augment the offset fetch request with a boolean to indicate "give me an retriable error code if there's pending offset, rather than sending me the committed offset immediately". Personally I still f

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-18 Thread Boyang Chen
Thank you Jason for the ideas. On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson wrote: > Hi Boyang, > > Thanks for the updates. A few comments below: > > 1. The KIP mentions that `transaction.timeout.ms` should be reduced to > 10s. > I think this makes sense for Kafka Streams which is tied to the

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-15 Thread Jason Gustafson
Hi Boyang, Thanks for the updates. A few comments below: 1. The KIP mentions that `transaction.timeout.ms` should be reduced to 10s. I think this makes sense for Kafka Streams which is tied to the consumer group semantics and uses a default 10s session timeout. However, it seems a bit dangerous t

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-08 Thread Boyang Chen
Hey Guozhang, I will correct my statement from last email. I don't think the read_committed (3.a) is necessary to be added to the OffsetFetch request, as if we are using EOS application, the underlying consumers within the group should always back off when there is pending offsets. Let me know if

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-02 Thread Boyang Chen
Thank you Guozhang for the questions, inline answers are below. On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen wrote: > Hey all, > > I have done a fundamental polish of KIP-447 > > and > w

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-02 Thread Boyang Chen
Hey all, I have done a fundamental polish of KIP-447 and written a design doc depicting internal ch

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-26 Thread Guozhang Wang
2. The reason we did not expose generation.id from KafkaConsumer public APIs directly is to abstract this notion from users (since it is an implementation detail of the rebalance protocol itself, e.g. if user calls consumer.assign() they do not need to invoke ConsumerCoordinator and no need to be a

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Jason Gustafson
For reference, we have BrokerApiVersionCommand already as a public interface. We have a bit of tech debt at the moment because it uses a custom AdminClient. It would be nice to clean that up. In general, I think it is reasonable to expose from AdminClient. It can be used by management tools to insp

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Boyang Chen
Thank you for the context Colin. The groupId was indeed a copy-paste error. Our use case here for 447 is (Quoted from Guozhang): ''' I think if we can do something else to avoid this config though, for example we can use the embedded AdminClient to send the APIVersion request upon starting up, and

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Colin McCabe
kafka.api.ApiVersion is an internal class, not suitable to exposing through AdminClient. That class is not even accessible without having the broker jars on your CLASSPATH. Another question is, what is the groupId parameter doing in the call? The API versions are the same no matter what consu

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Boyang Chen
Actually, after a second thought, I think it actually makes sense to support auto upgrade through admin client to help use get api version from broker. A draft KIP is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client Boyang On Tue, Jun 25,

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Boyang Chen
Thank you Guozhang, some of my understandings are inline below. On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson wrote: > > > > I think co-locating does have some merits here, i.e. letting the > > ConsumerCoordinator which has the source-of-truth of assignment to act as > > the TxnCoordinator as

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Jason Gustafson
> > I think co-locating does have some merits here, i.e. letting the > ConsumerCoordinator which has the source-of-truth of assignment to act as > the TxnCoordinator as well; but I agree there's also some cons of coupling > them together. I'm still a bit inclining towards colocation but if there >

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Guozhang Wang
Hi Boyang, 1. One advantage of retry against on-hold is that it will not tie-up a handler thread (of course the latter could do the same but that involves using a purgatory which is more complicated), and also it is less likely to violate request timeout. So I think there are some rationales to pr

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Jason Gustafson
> > The question I have is that whether we should name the interface > `GroupAssignment` instead, so > that Connect later could also extend on the same interface, just to echo > Guozhang's point here, Are you referring to the API used for initTransactions? There would be no reason to use a more g

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-24 Thread Boyang Chen
Hey Jason, thank you for the proposal here. Some of my thoughts below. On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson wrote: > Hi Boyang, > > Thanks for picking this up! Still reading through the updates, but here are > a couple initial comments on the APIs: > > 1. The `TxnProducerIdentity` cl

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-24 Thread Jason Gustafson
Hi Boyang, Thanks for picking this up! Still reading through the updates, but here are a couple initial comments on the APIs: 1. The `TxnProducerIdentity` class is a bit awkward. I think we are trying to encapsulate state from the current group assignment. Maybe something like `ConsumerAssignment

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-24 Thread Boyang Chen
Thank you Ismael for the suggestion. We will attempt to address it by giving more details to rejected alternative section. Thank you for the comment Guozhang! Answers are inline below. On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang wrote: > Hello Boyang, > > Thanks for the KIP, I have some co

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-23 Thread Guozhang Wang
Hello Boyang, Thanks for the KIP, I have some comments below: 1. "Once transactions are complete, the call will return." This seems different from the existing behavior, in which we would return a retriable CONCURRENT_TRANSACTIONS and let the client to retry, is this intentional? 2. "an overload

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-22 Thread Ismael Juma
Hi Boyang, Thanks for the KIP. It's good that we listed a number of rejected alternatives. It would be helpful to have an explanation of why they were rejected. Ismael On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen wrote: > Hey all, > > I would like to start a discussion for KIP-447: > > https://

[DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-22 Thread Boyang Chen
Hey all, I would like to start a discussion for KIP-447: https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics this is a work originated by Jason Gustafson and we would like to proceed into discussion stage. Let me know your thoughts, thank