Jun,

There is one more case: non-windowed aggregations. For windowed
aggregation, the changelog topic will be compact+delete. However, for
non-windowed aggregation the policy is compact only.

Even if we assume that windowed aggregations are dominant and
non-windowed aggregation are used rarely, it seems to be bad to not
support the feature is a non-windowed aggregation is used. Also,
non-windowed aggregation volume depends on input-stream volume that
might be high.

Furthermore, we support stream-table join and this requires that the
stream and the table are co-partitioned. Thus, even if the table would
have lower volume but the stream must be scaled out, we also need to
scale out the table to preserve co-partitioning.


-Matthias

On 3/8/18 6:44 PM, Jun Rao wrote:
> Hi, Matthis,
> 
> My understanding is that in KStream, the only case when a changelog topic
> needs to be compacted is when the corresponding input is a KTable. In all
> other cases, the changelog topics are of compacted + deletion. So, if most
> KTables are not high volume, there may not be a need to expand its
> partitions and therefore the partitions of the corresponding changelog
> topic.
> 
> Thanks,
> 
> Jun
> 
> On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Jun,
>>
>> thanks for your comment. This should actually work for Streams, because
>> we don't rely on producer "hashing" but specify the partition number
>> explicitly on send().
>>
>> About not allowing to change the number of partition for changelog
>> topics: for Streams, this seems to imply that we need to create a second
>> changelog topic for each store with the new partition count. However, it
>> would be unclear when/if we can delete the old topic. Thus, it moves the
>> "problem" into the application layer. It's hard to judge for me atm what
>> the impact would be, but it's something we should pay attention to.
>>
>>
>> -Matthias
>>
>> On 3/6/18 3:45 PM, Jun Rao wrote:
>>> Hi, Mattias,
>>>
>>> Regarding your comment "If it would be time-delay based, it might be
>>> problematic
>>> for Kafka Streams: if we get the information that the new input
>> partitions
>>> are available for producing, we need to enable the new changelog
>> partitions
>>> for producing, too. If those would not be available yet, because the
>>> time-delay did not trigger yet, it would be problematic to avoid
>>> crashing.", could you just enable the changelog topic to write to its new
>>> partitions immediately?  The input topic can be configured with a delay
>> in
>>> writing to the new partitions. Initially, there won't be new data
>> produced
>>> into the newly added partitions in the input topic. However, we could
>>> prebuild the state for the new input partition and write the state
>> changes
>>> to the corresponding new partitions in the changelog topic.
>>>
>>> Hi, Jan,
>>>
>>> For a compacted topic, garbage collecting the old keys in the existing
>>> partitions after partition expansion can be tricky as your pointed out. A
>>> few options here. (a) Let brokers exchange keys across brokers during
>>> compaction. This will add complexity on the broker side. (b) Build an
>>> external tool that scans the compacted topic and drop the prefix of a
>>> partition if all records in the prefix are removable. The admin can then
>>> run this tool when the unneeded space needs to be reclaimed. (c) Don't
>>> support partition change in a compacted topic. This might be ok since
>> most
>>> compacted topics are not high volume.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> Thanks for all the comments! It appears that everyone prefers linear
>>>> hashing because it reduces the amount of state that needs to be moved
>>>> between consumers (for stream processing). The KIP has been updated to
>> use
>>>> linear hashing.
>>>>
>>>> Regarding the migration endeavor: it seems that migrating producer
>> library
>>>> to use linear hashing should be pretty straightforward without
>>>> much operational endeavor. If we don't upgrade client library to use
>> this
>>>> KIP, we can not support in-order delivery after partition is changed
>>>> anyway. Suppose we upgrade client library to use this KIP, if partition
>>>> number is not changed, the key -> partition mapping will be exactly the
>>>> same as it is now because it is still determined using murmur_hash(key)
>> %
>>>> original_partition_num. In other words, this change is backward
>> compatible.
>>>>
>>>> Regarding the load distribution: if we use linear hashing, the load may
>> be
>>>> unevenly distributed because those partitions which are not split may
>>>> receive twice as much traffic as other partitions that are split. This
>>>> issue can be mitigated by creating topic with partitions that are
>> several
>>>> times the number of consumers. And there will be no imbalance if the
>>>> partition number is always doubled. So this imbalance seems acceptable.
>>>>
>>>> Regarding storing the partition strategy as per-topic config: It seems
>> not
>>>> necessary since we can still use murmur_hash as the default hash
>> function
>>>> and additionally apply the linear hashing algorithm if the partition
>> number
>>>> has increased. Not sure if there is any use-case for producer to use a
>>>> different hash function. Jason, can you check if there is some use-case
>>>> that I missed for using the per-topic partition strategy?
>>>>
>>>> Regarding how to reduce latency (due to state store/load) in stream
>>>> processing consumer when partition number changes: I need to read the
>> Kafka
>>>> Stream code to understand how Kafka Stream currently migrate state
>> between
>>>> consumers when the application is added/removed for a given job. I will
>>>> reply after I finish reading the documentation and code.
>>>>
>>>>
>>>> Thanks,
>>>> Dong
>>>>
>>>>
>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io>
>>>> wrote:
>>>>
>>>>> Great discussion. I think I'm wondering whether we can continue to
>> leave
>>>>> Kafka agnostic to the partitioning strategy. The challenge is
>>>> communicating
>>>>> the partitioning logic from producers to consumers so that the
>>>> dependencies
>>>>> between each epoch can be determined. For the sake of discussion,
>> imagine
>>>>> you did something like the following:
>>>>>
>>>>> 1. The name (and perhaps version) of a partitioning strategy is stored
>> in
>>>>> topic configuration when a topic is created.
>>>>> 2. The producer looks up the partitioning strategy before writing to a
>>>>> topic and includes it in the produce request (for fencing). If it
>> doesn't
>>>>> have an implementation for the configured strategy, it fails.
>>>>> 3. The consumer also looks up the partitioning strategy and uses it to
>>>>> determine dependencies when reading a new epoch. It could either fail
>> or
>>>>> make the most conservative dependency assumptions if it doesn't know
>> how
>>>> to
>>>>> implement the partitioning strategy. For the consumer, the new
>> interface
>>>>> might look something like this:
>>>>>
>>>>> // Return the partition dependencies following an epoch bump
>>>>> Map<Integer, List<Integer>> dependencies(int
>>>> numPartitionsBeforeEpochBump,
>>>>> int numPartitionsAfterEpochBump)
>>>>>
>>>>> The unordered case then is just a particular implementation which never
>>>> has
>>>>> any epoch dependencies. To implement this, we would need some way for
>> the
>>>>> consumer to find out how many partitions there were in each epoch, but
>>>>> maybe that's not too unreasonable.
>>>>>
>>>>> Thanks,
>>>>> Jason
>>>>>
>>>>>
>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com
>>>
>>>>> wrote:
>>>>>
>>>>>> Hi Dong
>>>>>>
>>>>>> thank you very much for your questions.
>>>>>>
>>>>>> regarding the time spend copying data across:
>>>>>> It is correct that copying data from a topic with one partition
>> mapping
>>>>> to
>>>>>> a topic with a different partition mapping takes way longer than we
>> can
>>>>>> stop producers. Tens of minutes is a very optimistic estimate here.
>>>> Many
>>>>>> people can not afford copy full steam and therefore will have some
>> rate
>>>>>> limiting in place, this can bump the timespan into the day's. The good
>>>>> part
>>>>>> is that the vast majority of the data can be copied while the
>> producers
>>>>> are
>>>>>> still going. One can then, piggyback the consumers ontop of this
>>>>> timeframe,
>>>>>> by the method mentioned (provide them an mapping from their old
>> offsets
>>>>> to
>>>>>> new offsets in their repartitioned topics. In that way we separate
>>>>>> migration of consumers from migration of producers (decoupling these
>> is
>>>>>> what kafka is strongest at). The time to actually swap over the
>>>> producers
>>>>>> should be kept minimal by ensuring that when a swap attempt is started
>>>>> the
>>>>>> consumer copying over should be very close to the log end and is
>>>> expected
>>>>>> to finish within the next fetch. The operation should have a time-out
>>>> and
>>>>>> should be "reattemtable".
>>>>>>
>>>>>> Importance of logcompaction:
>>>>>> If a producer produces key A, to partiton 0, its forever gonna be
>>>> there,
>>>>>> unless it gets deleted. The record might sit in there for years. A new
>>>>>> producer started with the new partitions will fail to delete the
>> record
>>>>> in
>>>>>> the correct partition. Th record will be there forever and one can not
>>>>>> reliable bootstrap new consumers. I cannot see how linear hashing can
>>>>> solve
>>>>>> this.
>>>>>>
>>>>>> Regarding your skipping of userland copying:
>>>>>> 100%, copying the data across in userland is, as far as i can see,
>>>> only a
>>>>>> usecase for log compacted topics. Even for logcompaction + retentions
>>>> it
>>>>>> should only be opt-in. Why did I bring it up? I think log compaction
>>>> is a
>>>>>> very important feature to really embrace kafka as a "data plattform".
>>>> The
>>>>>> point I also want to make is that copying data this way is completely
>>>>>> inline with the kafka architecture. it only consists of reading and
>>>>> writing
>>>>>> to topics.
>>>>>>
>>>>>> I hope it clarifies more why I think we should aim for more than the
>>>>>> current KIP. I fear that once the KIP is done not much more effort
>> will
>>>>> be
>>>>>> taken.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
>>>>>>
>>>>>>> Hey Jan,
>>>>>>>
>>>>>>> In the current proposal, the consumer will be blocked on waiting for
>>>>> other
>>>>>>> consumers of the group to consume up to a given offset. In most
>> cases,
>>>>> all
>>>>>>> consumers should be close to the LEO of the partitions when the
>>>>> partition
>>>>>>> expansion happens. Thus the time waiting should not be long e.g. on
>>>> the
>>>>>>> order of seconds. On the other hand, it may take a long time to wait
>>>> for
>>>>>>> the entire partition to be copied -- the amount of time is
>>>> proportional
>>>>> to
>>>>>>> the amount of existing data in the partition, which can take tens of
>>>>>>> minutes. So the amount of time that we stop consumers may not be on
>>>> the
>>>>>>> same order of magnitude.
>>>>>>>
>>>>>>> If we can implement this suggestion without copying data over in
>> purse
>>>>>>> userland, it will be much more valuable. Do you have ideas on how
>> this
>>>>> can
>>>>>>> be done?
>>>>>>>
>>>>>>> Not sure why the current KIP not help people who depend on log
>>>>> compaction.
>>>>>>> Could you elaborate more on this point?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dong
>>>>>>>
>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago.
>>>> com
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Dong,
>>>>>>>>
>>>>>>>> I tried to focus on what the steps are one can currently perform to
>>>>>>>> expand
>>>>>>>> or shrink a keyed topic while maintaining a top notch semantics.
>>>>>>>> I can understand that there might be confusion about "stopping the
>>>>>>>> consumer". It is exactly the same as proposed in the KIP. there
>> needs
>>>>> to
>>>>>>>> be
>>>>>>>> a time the producers agree on the new partitioning. The extra
>>>>> semantics I
>>>>>>>> want to put in there is that we have a possibility to wait until all
>>>>> the
>>>>>>>> existing data
>>>>>>>> is copied over into the new partitioning scheme. When I say stopping
>>>> I
>>>>>>>> think more of having a memory barrier that ensures the ordering. I
>> am
>>>>>>>> still
>>>>>>>> aming for latencies  on the scale of leader failovers.
>>>>>>>>
>>>>>>>> Consumers have to explicitly adapt the new partitioning scheme in
>> the
>>>>>>>> above scenario. The reason is that in these cases where you are
>>>>> dependent
>>>>>>>> on a particular partitioning scheme, you also have other topics that
>>>>> have
>>>>>>>> co-partition enforcements or the kind -frequently. Therefore all
>> your
>>>>>>>> other
>>>>>>>> input topics might need to grow accordingly.
>>>>>>>>
>>>>>>>>
>>>>>>>> What I was suggesting was to streamline all these operations as best
>>>> as
>>>>>>>> possible to have "real" partition grow and shrinkage going on.
>>>>> Migrating
>>>>>>>> the producers to a new partitioning scheme can be much more
>>>> streamlined
>>>>>>>> with proper broker support for this. Migrating consumer is a step
>>>> that
>>>>>>>> might be made completly unnecessary if - for example streams - takes
>>>>> the
>>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect
>>>>> consumers
>>>>>>>> and other consumers should be fine anyways.
>>>>>>>>
>>>>>>>> I hope this makes more clear where I was aiming at. The rest needs
>> to
>>>>> be
>>>>>>>> figured out. The only danger i see is that when we are introducing
>>>> this
>>>>>>>> feature as supposed in the KIP, it wont help any people depending on
>>>>> log
>>>>>>>> compaction.
>>>>>>>>
>>>>>>>> The other thing I wanted to mention is that I believe the current
>>>>>>>> suggestion (without copying data over) can be implemented in pure
>>>>>>>> userland
>>>>>>>> with a custom partitioner and a small feedbackloop from
>>>> ProduceResponse
>>>>>>>> =>
>>>>>>>> Partitionier in coorporation with a change management system.
>>>>>>>>
>>>>>>>> Best Jan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
>>>>>>>>
>>>>>>>> Hey Jan,
>>>>>>>>>
>>>>>>>>> I am not sure if it is acceptable for producer to be stopped for a
>>>>>>>>> while,
>>>>>>>>> particularly for online application which requires low latency. I
>> am
>>>>>>>>> also
>>>>>>>>> not sure how consumers can switch to a new topic. Does user
>>>>> application
>>>>>>>>> needs to explicitly specify a different topic for producer/consumer
>>>> to
>>>>>>>>> subscribe to? It will be helpful for discussion if you can provide
>>>>> more
>>>>>>>>> detail on the interface change for this solution.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Dong
>>>>>>>>>
>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
>> Filipiak<Jan.Filipiak@trivago.
>>>>> com
>>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>> just want to throw my though in. In general the functionality is
>>>> very
>>>>>>>>>> usefull, we should though not try to find the architecture to hard
>>>>>>>>>> while
>>>>>>>>>> implementing.
>>>>>>>>>>
>>>>>>>>>> The manual steps would be to
>>>>>>>>>>
>>>>>>>>>> create a new topic
>>>>>>>>>> the mirrormake from the new old topic to the new topic
>>>>>>>>>> wait for mirror making to catch up.
>>>>>>>>>> then put the consumers onto the new topic
>>>>>>>>>>       (having mirrormaker spit out a mapping from old offsets to
>>>> new
>>>>>>>>>> offsets:
>>>>>>>>>>           if topic is increased by factor X there is gonna be a
>>>> clean
>>>>>>>>>> mapping from 1 offset in the old topic to X offsets in the new
>>>> topic,
>>>>>>>>>>           if there is no factor then there is no chance to
>>>> generate a
>>>>>>>>>> mapping that can be reasonable used for continuing)
>>>>>>>>>>       make consumers stop at appropriate points and continue
>>>>>>>>>> consumption
>>>>>>>>>> with offsets from the mapping.
>>>>>>>>>> have the producers stop for a minimal time.
>>>>>>>>>> wait for mirrormaker to finish
>>>>>>>>>> let producer produce with the new metadata.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Instead of implementing the approach suggest in the KIP which will
>>>>>>>>>> leave
>>>>>>>>>> log compacted topic completely crumbled and unusable.
>>>>>>>>>> I would much rather try to build infrastructure to support the
>>>>>>>>>> mentioned
>>>>>>>>>> above operations more smoothly.
>>>>>>>>>> Especially having producers stop and use another topic is
>> difficult
>>>>> and
>>>>>>>>>> it would be nice if one can trigger "invalid metadata" exceptions
>>>> for
>>>>>>>>>> them
>>>>>>>>>> and
>>>>>>>>>> if one could give topics aliases so that their produces with the
>>>> old
>>>>>>>>>> topic
>>>>>>>>>> will arrive in the new topic.
>>>>>>>>>>
>>>>>>>>>> The downsides are obvious I guess ( having the same data twice for
>>>>> the
>>>>>>>>>> transition period, but kafka tends to scale well with datasize).
>> So
>>>>>>>>>> its a
>>>>>>>>>> nicer fit into the architecture.
>>>>>>>>>>
>>>>>>>>>> I further want to argument that the functionality by the KIP can
>>>>>>>>>> completely be implementing in "userland" with a custom partitioner
>>>>> that
>>>>>>>>>> handles the transition as needed. I would appreciate if someone
>>>> could
>>>>>>>>>> point
>>>>>>>>>> out what a custom partitioner couldn't handle in this case?
>>>>>>>>>>
>>>>>>>>>> With the above approach, shrinking a topic becomes the same steps.
>>>>>>>>>> Without
>>>>>>>>>> loosing keys in the discontinued partitions.
>>>>>>>>>>
>>>>>>>>>> Would love to hear what everyone thinks.
>>>>>>>>>>
>>>>>>>>>> Best Jan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
>>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>>> I have created KIP-253: Support in-order message delivery with
>>>>>>>>>>> partition
>>>>>>>>>>> expansion. See
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
>>>>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
>>>>>>>>>>> .
>>>>>>>>>>>
>>>>>>>>>>> This KIP provides a way to allow messages of the same key from
>> the
>>>>>>>>>>> same
>>>>>>>>>>> producer to be consumed in the same order they are produced even
>>>> if
>>>>> we
>>>>>>>>>>> expand partition of the topic.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dong
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to