As I just mentioned joins:

For Kafka Streams it might also be required to change the partition
count for multiple topics in a coordinated way that allows to maintain
the co-partitioning property that Kafka Streams uses to computed joins.

Any thoughts how this could be handled?


-Matthias

On 3/8/18 10:08 PM, Matthias J. Sax wrote:
> Jun,
> 
> There is one more case: non-windowed aggregations. For windowed
> aggregation, the changelog topic will be compact+delete. However, for
> non-windowed aggregation the policy is compact only.
> 
> Even if we assume that windowed aggregations are dominant and
> non-windowed aggregation are used rarely, it seems to be bad to not
> support the feature is a non-windowed aggregation is used. Also,
> non-windowed aggregation volume depends on input-stream volume that
> might be high.
> 
> Furthermore, we support stream-table join and this requires that the
> stream and the table are co-partitioned. Thus, even if the table would
> have lower volume but the stream must be scaled out, we also need to
> scale out the table to preserve co-partitioning.
> 
> 
> -Matthias
> 
> On 3/8/18 6:44 PM, Jun Rao wrote:
>> Hi, Matthis,
>>
>> My understanding is that in KStream, the only case when a changelog topic
>> needs to be compacted is when the corresponding input is a KTable. In all
>> other cases, the changelog topics are of compacted + deletion. So, if most
>> KTables are not high volume, there may not be a need to expand its
>> partitions and therefore the partitions of the corresponding changelog
>> topic.
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Jun,
>>>
>>> thanks for your comment. This should actually work for Streams, because
>>> we don't rely on producer "hashing" but specify the partition number
>>> explicitly on send().
>>>
>>> About not allowing to change the number of partition for changelog
>>> topics: for Streams, this seems to imply that we need to create a second
>>> changelog topic for each store with the new partition count. However, it
>>> would be unclear when/if we can delete the old topic. Thus, it moves the
>>> "problem" into the application layer. It's hard to judge for me atm what
>>> the impact would be, but it's something we should pay attention to.
>>>
>>>
>>> -Matthias
>>>
>>> On 3/6/18 3:45 PM, Jun Rao wrote:
>>>> Hi, Mattias,
>>>>
>>>> Regarding your comment "If it would be time-delay based, it might be
>>>> problematic
>>>> for Kafka Streams: if we get the information that the new input
>>> partitions
>>>> are available for producing, we need to enable the new changelog
>>> partitions
>>>> for producing, too. If those would not be available yet, because the
>>>> time-delay did not trigger yet, it would be problematic to avoid
>>>> crashing.", could you just enable the changelog topic to write to its new
>>>> partitions immediately?  The input topic can be configured with a delay
>>> in
>>>> writing to the new partitions. Initially, there won't be new data
>>> produced
>>>> into the newly added partitions in the input topic. However, we could
>>>> prebuild the state for the new input partition and write the state
>>> changes
>>>> to the corresponding new partitions in the changelog topic.
>>>>
>>>> Hi, Jan,
>>>>
>>>> For a compacted topic, garbage collecting the old keys in the existing
>>>> partitions after partition expansion can be tricky as your pointed out. A
>>>> few options here. (a) Let brokers exchange keys across brokers during
>>>> compaction. This will add complexity on the broker side. (b) Build an
>>>> external tool that scans the compacted topic and drop the prefix of a
>>>> partition if all records in the prefix are removable. The admin can then
>>>> run this tool when the unneeded space needs to be reclaimed. (c) Don't
>>>> support partition change in a compacted topic. This might be ok since
>>> most
>>>> compacted topics are not high volume.
>>>>
>>>> Thanks,
>>>>
>>>> Jun
>>>>
>>>>
>>>> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> Thanks for all the comments! It appears that everyone prefers linear
>>>>> hashing because it reduces the amount of state that needs to be moved
>>>>> between consumers (for stream processing). The KIP has been updated to
>>> use
>>>>> linear hashing.
>>>>>
>>>>> Regarding the migration endeavor: it seems that migrating producer
>>> library
>>>>> to use linear hashing should be pretty straightforward without
>>>>> much operational endeavor. If we don't upgrade client library to use
>>> this
>>>>> KIP, we can not support in-order delivery after partition is changed
>>>>> anyway. Suppose we upgrade client library to use this KIP, if partition
>>>>> number is not changed, the key -> partition mapping will be exactly the
>>>>> same as it is now because it is still determined using murmur_hash(key)
>>> %
>>>>> original_partition_num. In other words, this change is backward
>>> compatible.
>>>>>
>>>>> Regarding the load distribution: if we use linear hashing, the load may
>>> be
>>>>> unevenly distributed because those partitions which are not split may
>>>>> receive twice as much traffic as other partitions that are split. This
>>>>> issue can be mitigated by creating topic with partitions that are
>>> several
>>>>> times the number of consumers. And there will be no imbalance if the
>>>>> partition number is always doubled. So this imbalance seems acceptable.
>>>>>
>>>>> Regarding storing the partition strategy as per-topic config: It seems
>>> not
>>>>> necessary since we can still use murmur_hash as the default hash
>>> function
>>>>> and additionally apply the linear hashing algorithm if the partition
>>> number
>>>>> has increased. Not sure if there is any use-case for producer to use a
>>>>> different hash function. Jason, can you check if there is some use-case
>>>>> that I missed for using the per-topic partition strategy?
>>>>>
>>>>> Regarding how to reduce latency (due to state store/load) in stream
>>>>> processing consumer when partition number changes: I need to read the
>>> Kafka
>>>>> Stream code to understand how Kafka Stream currently migrate state
>>> between
>>>>> consumers when the application is added/removed for a given job. I will
>>>>> reply after I finish reading the documentation and code.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Dong
>>>>>
>>>>>
>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Great discussion. I think I'm wondering whether we can continue to
>>> leave
>>>>>> Kafka agnostic to the partitioning strategy. The challenge is
>>>>> communicating
>>>>>> the partitioning logic from producers to consumers so that the
>>>>> dependencies
>>>>>> between each epoch can be determined. For the sake of discussion,
>>> imagine
>>>>>> you did something like the following:
>>>>>>
>>>>>> 1. The name (and perhaps version) of a partitioning strategy is stored
>>> in
>>>>>> topic configuration when a topic is created.
>>>>>> 2. The producer looks up the partitioning strategy before writing to a
>>>>>> topic and includes it in the produce request (for fencing). If it
>>> doesn't
>>>>>> have an implementation for the configured strategy, it fails.
>>>>>> 3. The consumer also looks up the partitioning strategy and uses it to
>>>>>> determine dependencies when reading a new epoch. It could either fail
>>> or
>>>>>> make the most conservative dependency assumptions if it doesn't know
>>> how
>>>>> to
>>>>>> implement the partitioning strategy. For the consumer, the new
>>> interface
>>>>>> might look something like this:
>>>>>>
>>>>>> // Return the partition dependencies following an epoch bump
>>>>>> Map<Integer, List<Integer>> dependencies(int
>>>>> numPartitionsBeforeEpochBump,
>>>>>> int numPartitionsAfterEpochBump)
>>>>>>
>>>>>> The unordered case then is just a particular implementation which never
>>>>> has
>>>>>> any epoch dependencies. To implement this, we would need some way for
>>> the
>>>>>> consumer to find out how many partitions there were in each epoch, but
>>>>>> maybe that's not too unreasonable.
>>>>>>
>>>>>> Thanks,
>>>>>> Jason
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com
>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Dong
>>>>>>>
>>>>>>> thank you very much for your questions.
>>>>>>>
>>>>>>> regarding the time spend copying data across:
>>>>>>> It is correct that copying data from a topic with one partition
>>> mapping
>>>>>> to
>>>>>>> a topic with a different partition mapping takes way longer than we
>>> can
>>>>>>> stop producers. Tens of minutes is a very optimistic estimate here.
>>>>> Many
>>>>>>> people can not afford copy full steam and therefore will have some
>>> rate
>>>>>>> limiting in place, this can bump the timespan into the day's. The good
>>>>>> part
>>>>>>> is that the vast majority of the data can be copied while the
>>> producers
>>>>>> are
>>>>>>> still going. One can then, piggyback the consumers ontop of this
>>>>>> timeframe,
>>>>>>> by the method mentioned (provide them an mapping from their old
>>> offsets
>>>>>> to
>>>>>>> new offsets in their repartitioned topics. In that way we separate
>>>>>>> migration of consumers from migration of producers (decoupling these
>>> is
>>>>>>> what kafka is strongest at). The time to actually swap over the
>>>>> producers
>>>>>>> should be kept minimal by ensuring that when a swap attempt is started
>>>>>> the
>>>>>>> consumer copying over should be very close to the log end and is
>>>>> expected
>>>>>>> to finish within the next fetch. The operation should have a time-out
>>>>> and
>>>>>>> should be "reattemtable".
>>>>>>>
>>>>>>> Importance of logcompaction:
>>>>>>> If a producer produces key A, to partiton 0, its forever gonna be
>>>>> there,
>>>>>>> unless it gets deleted. The record might sit in there for years. A new
>>>>>>> producer started with the new partitions will fail to delete the
>>> record
>>>>>> in
>>>>>>> the correct partition. Th record will be there forever and one can not
>>>>>>> reliable bootstrap new consumers. I cannot see how linear hashing can
>>>>>> solve
>>>>>>> this.
>>>>>>>
>>>>>>> Regarding your skipping of userland copying:
>>>>>>> 100%, copying the data across in userland is, as far as i can see,
>>>>> only a
>>>>>>> usecase for log compacted topics. Even for logcompaction + retentions
>>>>> it
>>>>>>> should only be opt-in. Why did I bring it up? I think log compaction
>>>>> is a
>>>>>>> very important feature to really embrace kafka as a "data plattform".
>>>>> The
>>>>>>> point I also want to make is that copying data this way is completely
>>>>>>> inline with the kafka architecture. it only consists of reading and
>>>>>> writing
>>>>>>> to topics.
>>>>>>>
>>>>>>> I hope it clarifies more why I think we should aim for more than the
>>>>>>> current KIP. I fear that once the KIP is done not much more effort
>>> will
>>>>>> be
>>>>>>> taken.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
>>>>>>>
>>>>>>>> Hey Jan,
>>>>>>>>
>>>>>>>> In the current proposal, the consumer will be blocked on waiting for
>>>>>> other
>>>>>>>> consumers of the group to consume up to a given offset. In most
>>> cases,
>>>>>> all
>>>>>>>> consumers should be close to the LEO of the partitions when the
>>>>>> partition
>>>>>>>> expansion happens. Thus the time waiting should not be long e.g. on
>>>>> the
>>>>>>>> order of seconds. On the other hand, it may take a long time to wait
>>>>> for
>>>>>>>> the entire partition to be copied -- the amount of time is
>>>>> proportional
>>>>>> to
>>>>>>>> the amount of existing data in the partition, which can take tens of
>>>>>>>> minutes. So the amount of time that we stop consumers may not be on
>>>>> the
>>>>>>>> same order of magnitude.
>>>>>>>>
>>>>>>>> If we can implement this suggestion without copying data over in
>>> purse
>>>>>>>> userland, it will be much more valuable. Do you have ideas on how
>>> this
>>>>>> can
>>>>>>>> be done?
>>>>>>>>
>>>>>>>> Not sure why the current KIP not help people who depend on log
>>>>>> compaction.
>>>>>>>> Could you elaborate more on this point?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dong
>>>>>>>>
>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago.
>>>>> com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Dong,
>>>>>>>>>
>>>>>>>>> I tried to focus on what the steps are one can currently perform to
>>>>>>>>> expand
>>>>>>>>> or shrink a keyed topic while maintaining a top notch semantics.
>>>>>>>>> I can understand that there might be confusion about "stopping the
>>>>>>>>> consumer". It is exactly the same as proposed in the KIP. there
>>> needs
>>>>>> to
>>>>>>>>> be
>>>>>>>>> a time the producers agree on the new partitioning. The extra
>>>>>> semantics I
>>>>>>>>> want to put in there is that we have a possibility to wait until all
>>>>>> the
>>>>>>>>> existing data
>>>>>>>>> is copied over into the new partitioning scheme. When I say stopping
>>>>> I
>>>>>>>>> think more of having a memory barrier that ensures the ordering. I
>>> am
>>>>>>>>> still
>>>>>>>>> aming for latencies  on the scale of leader failovers.
>>>>>>>>>
>>>>>>>>> Consumers have to explicitly adapt the new partitioning scheme in
>>> the
>>>>>>>>> above scenario. The reason is that in these cases where you are
>>>>>> dependent
>>>>>>>>> on a particular partitioning scheme, you also have other topics that
>>>>>> have
>>>>>>>>> co-partition enforcements or the kind -frequently. Therefore all
>>> your
>>>>>>>>> other
>>>>>>>>> input topics might need to grow accordingly.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What I was suggesting was to streamline all these operations as best
>>>>> as
>>>>>>>>> possible to have "real" partition grow and shrinkage going on.
>>>>>> Migrating
>>>>>>>>> the producers to a new partitioning scheme can be much more
>>>>> streamlined
>>>>>>>>> with proper broker support for this. Migrating consumer is a step
>>>>> that
>>>>>>>>> might be made completly unnecessary if - for example streams - takes
>>>>>> the
>>>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect
>>>>>> consumers
>>>>>>>>> and other consumers should be fine anyways.
>>>>>>>>>
>>>>>>>>> I hope this makes more clear where I was aiming at. The rest needs
>>> to
>>>>>> be
>>>>>>>>> figured out. The only danger i see is that when we are introducing
>>>>> this
>>>>>>>>> feature as supposed in the KIP, it wont help any people depending on
>>>>>> log
>>>>>>>>> compaction.
>>>>>>>>>
>>>>>>>>> The other thing I wanted to mention is that I believe the current
>>>>>>>>> suggestion (without copying data over) can be implemented in pure
>>>>>>>>> userland
>>>>>>>>> with a custom partitioner and a small feedbackloop from
>>>>> ProduceResponse
>>>>>>>>> =>
>>>>>>>>> Partitionier in coorporation with a change management system.
>>>>>>>>>
>>>>>>>>> Best Jan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
>>>>>>>>>
>>>>>>>>> Hey Jan,
>>>>>>>>>>
>>>>>>>>>> I am not sure if it is acceptable for producer to be stopped for a
>>>>>>>>>> while,
>>>>>>>>>> particularly for online application which requires low latency. I
>>> am
>>>>>>>>>> also
>>>>>>>>>> not sure how consumers can switch to a new topic. Does user
>>>>>> application
>>>>>>>>>> needs to explicitly specify a different topic for producer/consumer
>>>>> to
>>>>>>>>>> subscribe to? It will be helpful for discussion if you can provide
>>>>>> more
>>>>>>>>>> detail on the interface change for this solution.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Dong
>>>>>>>>>>
>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
>>> Filipiak<Jan.Filipiak@trivago.
>>>>>> com
>>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>> just want to throw my though in. In general the functionality is
>>>>> very
>>>>>>>>>>> usefull, we should though not try to find the architecture to hard
>>>>>>>>>>> while
>>>>>>>>>>> implementing.
>>>>>>>>>>>
>>>>>>>>>>> The manual steps would be to
>>>>>>>>>>>
>>>>>>>>>>> create a new topic
>>>>>>>>>>> the mirrormake from the new old topic to the new topic
>>>>>>>>>>> wait for mirror making to catch up.
>>>>>>>>>>> then put the consumers onto the new topic
>>>>>>>>>>>       (having mirrormaker spit out a mapping from old offsets to
>>>>> new
>>>>>>>>>>> offsets:
>>>>>>>>>>>           if topic is increased by factor X there is gonna be a
>>>>> clean
>>>>>>>>>>> mapping from 1 offset in the old topic to X offsets in the new
>>>>> topic,
>>>>>>>>>>>           if there is no factor then there is no chance to
>>>>> generate a
>>>>>>>>>>> mapping that can be reasonable used for continuing)
>>>>>>>>>>>       make consumers stop at appropriate points and continue
>>>>>>>>>>> consumption
>>>>>>>>>>> with offsets from the mapping.
>>>>>>>>>>> have the producers stop for a minimal time.
>>>>>>>>>>> wait for mirrormaker to finish
>>>>>>>>>>> let producer produce with the new metadata.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Instead of implementing the approach suggest in the KIP which will
>>>>>>>>>>> leave
>>>>>>>>>>> log compacted topic completely crumbled and unusable.
>>>>>>>>>>> I would much rather try to build infrastructure to support the
>>>>>>>>>>> mentioned
>>>>>>>>>>> above operations more smoothly.
>>>>>>>>>>> Especially having producers stop and use another topic is
>>> difficult
>>>>>> and
>>>>>>>>>>> it would be nice if one can trigger "invalid metadata" exceptions
>>>>> for
>>>>>>>>>>> them
>>>>>>>>>>> and
>>>>>>>>>>> if one could give topics aliases so that their produces with the
>>>>> old
>>>>>>>>>>> topic
>>>>>>>>>>> will arrive in the new topic.
>>>>>>>>>>>
>>>>>>>>>>> The downsides are obvious I guess ( having the same data twice for
>>>>>> the
>>>>>>>>>>> transition period, but kafka tends to scale well with datasize).
>>> So
>>>>>>>>>>> its a
>>>>>>>>>>> nicer fit into the architecture.
>>>>>>>>>>>
>>>>>>>>>>> I further want to argument that the functionality by the KIP can
>>>>>>>>>>> completely be implementing in "userland" with a custom partitioner
>>>>>> that
>>>>>>>>>>> handles the transition as needed. I would appreciate if someone
>>>>> could
>>>>>>>>>>> point
>>>>>>>>>>> out what a custom partitioner couldn't handle in this case?
>>>>>>>>>>>
>>>>>>>>>>> With the above approach, shrinking a topic becomes the same steps.
>>>>>>>>>>> Without
>>>>>>>>>>> loosing keys in the discontinued partitions.
>>>>>>>>>>>
>>>>>>>>>>> Would love to hear what everyone thinks.
>>>>>>>>>>>
>>>>>>>>>>> Best Jan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>>> I have created KIP-253: Support in-order message delivery with
>>>>>>>>>>>> partition
>>>>>>>>>>>> expansion. See
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
>>>>>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
>>>>>>>>>>>> .
>>>>>>>>>>>>
>>>>>>>>>>>> This KIP provides a way to allow messages of the same key from
>>> the
>>>>>>>>>>>> same
>>>>>>>>>>>> producer to be consumed in the same order they are produced even
>>>>> if
>>>>>> we
>>>>>>>>>>>> expand partition of the topic.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Dong
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to