Hi Dong, Great work on this proposal! Just a couple initial comments:
My understanding is that the consumer will block on a topic until the all partitions have reached a certain partition epoch. What are the implications if a partition is offline? If we observe an epoch change while a partition is offline, it seems like we'd have to wait until the partition is back online before we can begin consuming the new epoch. Otherwise we will violate the ordering guarantees. Many use cases involve unordered data, so this would be a kind of regression in behavior, wouldn't it? A couple ideas: 1. Maybe we could have a topic configuration that controls whether or not ordering on the topic needs to be strictly followed? If we don't care about ordering, the consumer need not synchronize on epoch boundaries and we need not care about offline partitions. 2. Waiting on all partitions allows for any key partitioning function. It's good because it's general, but it is overly conservative when the partitioning function has finer control over key movement. For example, if the partitioner only allows for splits, then there is just one partition to await before consuming a new epoch for any given partition. I am not sure what it would look like, but I'm wondering if it would be possible to leverage the custom partitioning logic on the consumer side as well to avoid unneeded waiting. I think piggybacking the epoch exchanges onto the consumer heartbeats is a good idea. Just wanted to mention that consumers are not the only ones using the heartbeat API. For example, Kafka Connect also uses the group protocol to balance its load. Of course other use cases could leave these fields empty, but it's a little odd to have the protocol tailored specifically for one use case. To be honest, the group management protocol is one of the messier Kafka APIs and I don't think anyone is satisfied with the current approach. We need not redesign the whole thing in this KIP, but it might be nice to consider some options so that we're sure we're either heading in a better direction or at least not making things more confusing than they already are. The challenge is that it's useful to have some coordinator logic specific to the group type. I can imagine down the road that other use cases may also have some custom metadata which they need to piggyback on the heartbeat and they may also need the coordinator to do some facilitation. Maybe the heartbeat protocol could be left generic and we could have a separate module in the GroupCoordinator for custom consumer logic? Not too sure the best way to go. Thanks, Jason On Tue, Feb 27, 2018 at 11:49 PM, Stephane Maarek < steph...@simplemachines.com.au> wrote: > Sounds awesome ! > Are you planning to have auto scaling of partitions in a following KIP ? > That would be the holy grail > > On 28 Feb. 2018 5:13 pm, "Dong Lin" <lindon...@gmail.com> wrote: > > > Hey Jan, > > > > I am not sure if it is acceptable for producer to be stopped for a while, > > particularly for online application which requires low latency. I am also > > not sure how consumers can switch to a new topic. Does user application > > needs to explicitly specify a different topic for producer/consumer to > > subscribe to? It will be helpful for discussion if you can provide more > > detail on the interface change for this solution. > > > > Thanks, > > Dong > > > > On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak <jan.filip...@trivago.com > > > > wrote: > > > > > Hi, > > > > > > just want to throw my though in. In general the functionality is very > > > usefull, we should though not try to find the architecture to hard > while > > > implementing. > > > > > > The manual steps would be to > > > > > > create a new topic > > > the mirrormake from the new old topic to the new topic > > > wait for mirror making to catch up. > > > then put the consumers onto the new topic > > > (having mirrormaker spit out a mapping from old offsets to new > > offsets: > > > if topic is increased by factor X there is gonna be a clean > > > mapping from 1 offset in the old topic to X offsets in the new topic, > > > if there is no factor then there is no chance to generate a > > > mapping that can be reasonable used for continuing) > > > make consumers stop at appropriate points and continue consumption > > > with offsets from the mapping. > > > have the producers stop for a minimal time. > > > wait for mirrormaker to finish > > > let producer produce with the new metadata. > > > > > > > > > Instead of implementing the approach suggest in the KIP which will > leave > > > log compacted topic completely crumbled and unusable. > > > I would much rather try to build infrastructure to support the > mentioned > > > above operations more smoothly. > > > Especially having producers stop and use another topic is difficult and > > > it would be nice if one can trigger "invalid metadata" exceptions for > > them > > > and > > > if one could give topics aliases so that their produces with the old > > topic > > > will arrive in the new topic. > > > > > > The downsides are obvious I guess ( having the same data twice for the > > > transition period, but kafka tends to scale well with datasize). So > its a > > > nicer fit into the architecture. > > > > > > I further want to argument that the functionality by the KIP can > > > completely be implementing in "userland" with a custom partitioner that > > > handles the transition as needed. I would appreciate if someone could > > point > > > out what a custom partitioner couldn't handle in this case? > > > > > > With the above approach, shrinking a topic becomes the same steps. > > Without > > > loosing keys in the discontinued partitions. > > > > > > Would love to hear what everyone thinks. > > > > > > Best Jan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 11.02.2018 00:35, Dong Lin wrote: > > > > > >> Hi all, > > >> > > >> I have created KIP-253: Support in-order message delivery with > partition > > >> expansion. See > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253% > > >> 3A+Support+in-order+message+delivery+with+partition+expansion > > >> . > > >> > > >> This KIP provides a way to allow messages of the same key from the > same > > >> producer to be consumed in the same order they are produced even if we > > >> expand partition of the topic. > > >> > > >> Thanks, > > >> Dong > > >> > > >> > > > > > >