Hi,

the issue you describe, that on a "fresh" restart, all tasks are
assigned to the first thread is known, and the solution for it was to
introduce the new broker config you mentioned. Thus, there is no config
for 0.10.2.x brokers or Streams API to handle this case (that's why we
introduced the new config).

However, what I do not understand is, how you do your upgrade. From your
description, it seems that you stop the whole application, and afterward
restart all containers. However, Kafka Streams support rolling upgrades.
Thus, you could just stop-restart a single container, and after this
container did recreate all it's state and is running, you can move on to
the next one and so on, until all containers are upgraded to 0.11.0.1.

About the StickyAssignor: even for a fresh restart and after it did
assign all tasks to the first thread, it should move tasks to newly
starting thread. StickyAssignor put load balancing first and stickiness
second when deciding to place tasks. Thus, what you describe should not
happen and if it happens it's a bug. We would need DEBUG level logs to
dig into this.

Could it be, that the first KafkaStreams instance was still in status
"rebalancing" when you started the second/third container? If yes, this
might explain what you observed: if the first instance is in status
"rebalancing" it would miss that new instanced are joining the group.
(We fixed this in upcoming 1.0.0 release).

Hope this helps.


-Matthias


On 10/24/17 7:21 AM, Laven Sukumaran wrote:
> Hi all,
> 
> We're upgrading a Kafka streams application from 0.10.2.1 to 0.11.0.1 and
> our application is running against a Kafka cluster with version 0.10.2.1.
> 
> When we first attempted to upgrade our application to Kafka 0.11.0.1, we
> observed that when we deleted the PVCs for the service, and restarted, the
> new Kafka stream thread task stickiness behavior (
> https://issues.apache.org/jira/browse/KAFKA-4677) had unintentional
> side-effects. Specifically, in scenarios where PVC recreation is necessary,
> it appears that the new task stickiness semantics caused the task assignor
> to prematurely decide that there was only ever going to be one instance of
> our app in the consumer groups, and therefore all stream threads would be
> launched on the first instance in the consumer group. Later, when the
> second and third instance of our app were launched, the task assignor did
> not take these new instances into account, and none of the partitions were
> assigned to the second and third instance of our app.
> 
> We worked around this by deleting the PVCs, and when pod one was running
> (running as defined by k8s), we would repeatedly kubectl delete the pod
> until instance 2 and 3 were successfully running as well. Then, we would
> allow the reprocessing to take place. This caused the task assignor to take
> into account all pod instances, before making assignments which 'stuck'.
> 
> 
> I saw with the 0.11.0.0 change that a new broker config was introduced "
> group.initial.rebalance.delay.ms" however from what I understand this would
> require upgrading our Kafka cluster to 0.11.0.0 which is not something we
> will be able to do in the near future.
> 
> 
> 
> *"The group.initial.rebalance.delay.ms
> <http://group.initial.rebalance.delay.ms/> config specifies the amount of
> time, in milliseconds, that the GroupCoordinator will delay the initial
> consumer rebalance. The rebalance is further delayed by the value
> of group.initial.rebalance.delay.ms
> <http://group.initial.rebalance.delay.ms/> as each new member joins the
> consumer group, up to a maximum of the value set by max.poll.interval.ms
> <http://max.poll.interval.ms/>. The net benefit is that this can reduce the
> overall startup time for Kafka Streams applications that have more than one
> thread. The default value for group.initial.rebalance.delay.ms
> <http://group.initial.rebalance.delay.ms/> is 3000 milliseconds.In practice
> this means that if you are starting up your Kafka Streams app from a cold
> start, when the first member joins the group there will be at least a 3
> second delay before it is assigned any tasks. If any other members join the
> group within the initial 3 seconds, then there will be a further 3 second
> delay. Once no new members have joined the group within the 3 second delay,
> or max.poll.interval.ms <http://max.poll.interval.ms/> is reached, then the
> group rebalance can complete and all current members will be assigned
> tasks. The benefit of this approach, particularly for Kafka Streams
> applications, is that we can now delay the assignment and re-assignment of
> potentially expensive tasks as new members join. So we can avoid the
> situation where one instance is assigned all tasks, begins
> restoring/processing, only to shortly after be rebalanced, and then have to
> start again with half of the tasks and so on."*
> 
> Similar to the "group.initial.rebalance.delay.ms", are there any
> configuration values we can tweak to defer sticky task assignment to ensure
> all instances are running first.
> 
> Thanks,
> Laven
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to