Hi Pan,

Really appreciate your thoroughgoing review of the design doc. Your
suggestions on every aspect actually bring me back to lot of thought. I
have some small feedbacks below.

- YARN-based host-affinity is available in Samza 0.10 now. Does that mean
that option-2 is available and would meet the upgrade application
requirements? Or, is there something that is still missing that motivated
option 3?

Definitely host affinity and container by container rolling bounce will
work most of the time (when the resource utilization of yarn cluster is not
that high). The only thing we need to be careful is the exceptional
situation that a target physical machine in yarn cluster that hosts the
target samza container A doesn't have any free container more. If so, we
need to use start the container B in another physical machine. I think a
simple handoff protocol will help to resolve the small problem.

   - We are also working on a plan to allow container-by-container rolling
bounce in LinkedIn. In LinkedIn, we believe that host-affinity +
container-by-container rolling bounce would be good enough to ensure the
non-disruptive upgrade. There would only be the applications that have
really strong latency SLA on each individual event process that would not
be satisfied by this model.

Yes, host-affinity + container by container rolling is much clear solution.
 if we may always keep the utilization of yarn cluster below a threshold in
production, i would prefer to this solution. The hands-off protocol we
designed will be only useful, when container B can be assigned to the same
location of container B.

- The communication media between the container and job coordinator: we
have noticed a herding problem in various ways when many containers started
calling JobCoordinator’s http port (SAMZA-843). Similar thundering herd
issue could exist in ZooKeeper based solution when many clients are
watching on the same znode and would take actions (i.e. read/write) on the
znode all at the same time. That leads us to think that actually using
Kafka Coordinator Stream may be the best option since the pub-sub messaging
system is designed to support many subscribers and the messages are
delivered in a server-originated push model, which totally avoids the
thundering herd issue in the other two (which are all client-driven pull
models). There are some initial hurdles before we can rely on the
coordinator stream: a) we need to make it broadcast stream; b) we need to
make sure all containers are now consuming coordinator streams as well; c)
there would need to be some hand-shake protocols between containers and job
coordinator to establish barriers in the CoordinatorStream s.t. message
loss  / undelivered JobModel would be detected. But in the long run, it
would standardize a scalable solution for communication between containers
and JobCoordinator, which only relies on a reliable pub-sub messaging
system. The CoordinatorStream with the barrier protocol also may help to
implement a leader-election protocol as well, which would be super useful
for standalone Samza.

- Checkpoints and CoordinatorStream: when releasing 0.10, we also observed
that AppMaster becomes the bottleneck when checkpoints are written to
CoordinatorStream as well. The problem is that the checkpoints from all
containers would be a high-traffic volume to the CoordinatorStream, where
the current AppMaster implementation is not efficient in dealing with it.
Also, if we see CoordinatorStream as the control channel for the job, we
should only allow control signals to go into this channel to avoid: a)
unnecessarily consume tons of checkpoints in all containers; b) keep the
traffic volume in CoordinatorStream low such that important changes in job
configuration can be consumed and taken actions upon immediately.
Essentially, checkpoints are more of runtime state of the job, not the
metadata of the job (i.e. input topics, number of containers, container
configurations, etc.) which is relatively more static.

Yes, Chinmay give me similar feedback about choosing HTTP and ZooKeeper as
medium. Actually, we use finally choose coordinate stream in the
implementation. During the implementation, I went through stet a),  b) and
in the middle of C). The most challenge thing during the implementation is
the reliability and delay of ordination stream on kafka. Kafka doesn't
guarantee no miss message, so I worked on a temporary solution of
retry mechanism for missing message. As you said, checkpoints also will
block high priority message for job coordination. Therefore, if there is a
reliable broadcast ordinate channel between job coordinator and container,
the protocol implementation will be much cleaner and easier.


- Some questions to the design of handoff protocol:

   * using CoordinatorStream, we can allow old and new containers to
communicate directly, while JobCoordinator would be the supervisor that
initiates the handoff and monitors the progress.

Yes, If it is broadcast stream, we don't need to make job coordinator as a
mediator as the protocol. As you said, as long as it can monitor all other
messages during hands-off of container A and container B. It only need to
start conflict resolution, when it really have to.


   * between t3 and t4, how does the new container knows that it has
consumed all processed messages from the old container and need to enter
the RunLoop?

As container is still running, container B can't guarantee that it already
cached up before container A is stopped. Therefore, in the design we let
container A record its current change-log offset O1 and send to container
B. Once container B catch up util O1, it will notify container A to stop.
In this way, we want to minimize the lag caused by rolling start.

   * The swim lane design only talks about changelog offset, how about the
input stream offsets?

As I remember, the input stream offsets is stored in zookeeper (at least in
the samza version of last summer). Therefore, it should be synced after
container B consumes all of the changelog and before going into RunLoop.

   * The description of failure recovery is not super clear to me:

      * “If A die, we just ask container B to continue consume the stream
from the time we detect A is dead…” Questions: till when and from which
offset B should start consuming from input topics and start entering
RunLoop?

During execution of the protocol, container B will go through several
states:

1 wait for offset O1 from A  (Once it is received O1, it will start to
consume change-log stream util O1)

2 block in consuming change-log stream O1

3 block in RunLoop, after B finishing consuming all the change-logs and
also synced input stream offset.

4 start to consume input stream, right after receiving A is terminated
signal

As A can die when B is in any of the given states, to accelerate the
recovery, we just release all of lock condition along the path. In this
way, B will function just as a normal new container created by Yarn to take
place container A. But the difficulty here is that how can we disable the
fault tolerant mechanization of yarn in application master in this case.

   * Question regarding to the statement in 5.3.3: “To make a container to
consume change-log stream until particular offset and standby”. Which
container will need to stop consuming change-log at a particular offset?
The new container or the old one? I would assume that here we are talking
about the new container, since the old container will not consume from
change-log.

Yes, you are right. We are talking about container B.


I do not understand why the new container needs to “stop at a particular
offset”? Wouldn’t the new container needs to consume *all* changelog
messages, just like any recovery of stateful container today? I think that
the protocol can be much simpler the following way:

    AppMaster/JobCoordinator ==> start container B

                                                               \|/

                                                       container B
immediately started consume changelog

                                                               \|/

                                                       container B detects
that it has less than N messages left in changelog

                                                       or has not been able
to reduce the backlog for X min

                                                       (i.e. the old
container is generating changelog too fast)       =====================>
tell container A to block

                                                               \|/

                                                          \|/

                                                        container B
continues consuming changelog
                        container A block and checkpoint

                                                               \|/

                                                          \|/

                                                        container B
continues w/o going in RunLoop                     <=====================
current changelog and input topic checkpoints to B

                                                        until the
checkpoints / offset from A
                                                  |

                                                              \|/

                                                          \|/

                                                        container B enters
RunLoop and let A know                      ======================>
shutdown container A


      * Hence, the states of container B are only: {consuming backlog,
processing event} and container A’s states are only: {processing event,
block} during the handoff protocol. The AppMaster/JobCoordinator can
monitor the handoff process by listening to the coordinator stream messages
between A and B.

I think the only difference is that in the simpler protocol container B try
to determine whether it already consumed to an good enough offset by
itself, but in the design doc, container B need to wait for A to tell it,
and it starts to consume input stream after receiving notification of
A's termination. I guess two more states of B will be usefully for fault
handling in application master. But anyway If there is a broadcast reliable
channel, Both two protocols are good.

I am still new to stream processing. It is the second project for me at
uber laster summer.  I am in the last stage of graduation,  so a little
busy right now

But I am strongly willing to join the discussion and implementation of this
feature with Samza team that spans cross companies soon.

Best Regards

Peter Huang



On Mon, Jan 25, 2016 at 11:46 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Peter,
>
> I have read through the design doc and put down some of my thoughts below.
> Please let me know whether it makes sense:
>
> - YARN-based host-affinity is available in Samza 0.10 now. Does that mean
> that option-2 is available and would meet the upgrade application
> requirements? Or, is there something that is still missing that motivated
> option 3?
>
>    - We are also working on a plan to allow container-by-container rolling
> bounce in LinkedIn. In LinkedIn, we believe that host-affinity +
> container-by-container rolling bounce would be good enough to ensure the
> non-disruptive upgrade. There would only be the applications that have
> really strong latency SLA on each individual event process that would not
> be satisfied by this model.
>
> - The communication media between the container and job coordinator: we
> have noticed a herding problem in various ways when many containers started
> calling JobCoordinator’s http port (SAMZA-843). Similar thundering herd
> issue could exist in ZooKeeper based solution when many clients are
> watching on the same znode and would take actions (i.e. read/write) on the
> znode all at the same time. That leads us to think that actually using
> Kafka Coordinator Stream may be the best option since the pub-sub messaging
> system is designed to support many subscribers and the messages are
> delivered in a server-originated push model, which totally avoids the
> thundering herd issue in the other two (which are all client-driven pull
> models). There are some initial hurdles before we can rely on the
> coordinator stream: a) we need to make it broadcast stream; b) we need to
> make sure all containers are now consuming coordinator streams as well; c)
> there would need to be some hand-shake protocols between containers and job
> coordinator to establish barriers in the CoordinatorStream s.t. message
> loss  / undelivered JobModel would be detected. But in the long run, it
> would standardize a scalable solution for communication between containers
> and JobCoordinator, which only relies on a reliable pub-sub messaging
> system. The CoordinatorStream with the barrier protocol also may help to
> implement a leader-election protocol as well, which would be super useful
> for standalone Samza.
>
> - Checkpoints and CoordinatorStream: when releasing 0.10, we also observed
> that AppMaster becomes the bottleneck when checkpoints are written to
> CoordinatorStream as well. The problem is that the checkpoints from all
> containers would be a high-traffic volume to the CoordinatorStream, where
> the current AppMaster implementation is not efficient in dealing with it.
> Also, if we see CoordinatorStream as the control channel for the job, we
> should only allow control signals to go into this channel to avoid: a)
> unnecessarily consume tons of checkpoints in all containers; b) keep the
> traffic volume in CoordinatorStream low such that important changes in job
> configuration can be consumed and taken actions upon immediately.
> Essentially, checkpoints are more of runtime state of the job, not the
> metadata of the job (i.e. input topics, number of containers, container
> configurations, etc.) which is relatively more static.
>
>
> - Some questions to the design of handoff protocol:
>
>    * using CoordinatorStream, we can allow old and new containers to
> communicate directly, while JobCoordinator would be the supervisor that
> initiates the handoff and monitors the progress.
>
>    * between t3 and t4, how does the new container knows that it has
> consumed all processed messages from the old container and need to enter
> the RunLoop?
>
>    * The swim lane design only talks about changelog offset, how about the
> input stream offsets?
>
>    * The description of failure recovery is not super clear to me:
>
>       * “If A die, we just ask container B to continue consume the stream
> from the time we detect A is dead…” Questions: till when and from which
> offset B should start consuming from input topics and start entering
> RunLoop?
>
>    * Question regarding to the statement in 5.3.3: “To make a container to
> consume change-log stream until particular offset and standby”. Which
> container will need to stop consuming change-log at a particular offset?
> The new container or the old one? I would assume that here we are talking
> about the new container, since the old container will not consume from
> change-log. I do not understand why the new container needs to “stop at a
> particular offset”? Wouldn’t the new container needs to consume *all*
> changelog messages, just like any recovery of stateful container today? I
> think that the protocol can be much simpler the following way:
>
>     AppMaster/JobCoordinator ==> start container B
>
>                                                                \|/
>
>                                                        container B
> immediately started consume changelog
>
>                                                                \|/
>
>                                                        container B detects
> that it has less than N messages left in changelog
>
>                                                        or has not been
> able to reduce the backlog for X min
>
>                                                        (i.e. the old
> container is generating changelog too fast)       =====================>
> tell container A to block
>
>                                                                \|/
>
>                                                             \|/
>
>                                                         container B
> continues consuming changelog
>                         container A block and checkpoint
>
>                                                                \|/
>
>                                                             \|/
>
>                                                         container B
> continues w/o going in RunLoop                     <=====================
> current changelog and input topic checkpoints to B
>
>                                                         until the
> checkpoints / offset from A
>                                                   |
>
>                                                               \|/
>
>                                                           \|/
>
>                                                         container B enters
> RunLoop and let A know                      ======================>
> shutdown container A
>
>
>       * Hence, the states of container B are only: {consuming backlog,
> processing event} and container A’s states are only: {processing event,
> block} during the handoff protocol. The AppMaster/JobCoordinator can
> monitor the handoff process by listening to the coordinator stream messages
> between A and B.
>
> On Thu, Jan 14, 2016 at 10:52 AM, Peter Huang <huangzhenqiu0...@gmail.com>
> wrote:
>
>> Hi Chinmay and Pan,
>>
>> Thanks Chinmay for initialize the conversation. Pan, please feel free to
>> give comments on the design, specially suggestions about improving
>> efficiency of fault handling.
>>
>>
>> Best Regards
>> Peter Huang
>>
>>
>>
>> On Thu, Jan 14, 2016 at 10:24 AM, Chinmay Soman <
>> chinmay.cere...@gmail.com> wrote:
>>
>>> FYI: I've forwarded it to Yi's personal email. Yi: please feel free to
>>> email Peter (CC'ed) or me about any questions. FYI: the real solution would
>>> be to implement standby containers. This solution is an attempt to do the
>>> same.
>>>
>>> On Thu, Jan 14, 2016 at 10:17 AM, Yi Pan <nickpa...@gmail.com> wrote:
>>>
>>>> It might be the mail list restriction. Could you try to my personal
>>>> email
>>>> (i.e. nickpa...@gmail.com)?
>>>>
>>>> On Thu, Jan 14, 2016 at 10:15 AM, Chinmay Soman <
>>>> chinmay.cere...@gmail.com>
>>>> wrote:
>>>>
>>>> > It shows as attached in my sent email. That's weird.
>>>> >
>>>> > On Thu, Jan 14, 2016 at 10:14 AM, Yi Pan <nickpa...@gmail.com> wrote:
>>>> >
>>>> > > Hi, Chinmay,
>>>> > >
>>>> > > Did you forget to attach? I did not see the attachment in your last
>>>> > email.
>>>> > > Or is it due to the mail list restriction?
>>>> > >
>>>> > > On Thu, Jan 14, 2016 at 10:12 AM, Chinmay Soman <
>>>> > chinmay.cere...@gmail.com
>>>> > > >
>>>> > > wrote:
>>>> > >
>>>> > > > Sorry for the long delay (I was trying to find the design doc we
>>>> made).
>>>> > > > Please see attachment for the design doc. I'm also CC'ing Peter
>>>> Huang
>>>> > > (the
>>>> > > > intern) who worked on this.
>>>> > > >
>>>> > > > Disclaimer: Given this was an internship project, we've cut some
>>>> > corners.
>>>> > > > We plan to come up with an improved version in some time.
>>>> > > >
>>>> > > > On Wed, Jan 6, 2016 at 11:31 AM, Yi Pan <nickpa...@gmail.com>
>>>> wrote:
>>>> > > >
>>>> > > >> Hi, Chinmay,
>>>> > > >>
>>>> > > >> That's awesome! Could you share some design doc of this feature?
>>>> We
>>>> > > would
>>>> > > >> love to have this feature in LinkedIn as well!
>>>> > > >>
>>>> > > >> -Yi
>>>> > > >>
>>>> > > >> On Wed, Jan 6, 2016 at 10:02 AM, Chinmay Soman <
>>>> > > chinmay.cere...@gmail.com
>>>> > > >> >
>>>> > > >> wrote:
>>>> > > >>
>>>> > > >> > FYI: As part of an Uber internship project, we were working on
>>>> > exactly
>>>> > > >> this
>>>> > > >> > problem. Our approach was to do a rolling restart of all the
>>>> > > containers
>>>> > > >> > wherein we start a "replica" container for each primary
>>>> container
>>>> > and
>>>> > > >> let
>>>> > > >> > it "catch up" before we do the switch. Of course this doesn't
>>>> > > guarantee
>>>> > > >> > zero downtime, but it does guarantee minimum time to upgrade
>>>> each
>>>> > such
>>>> > > >> > container.
>>>> > > >> >
>>>> > > >> > The code is still in POC, but we do plan to finish this and
>>>> make
>>>> > this
>>>> > > >> > available. Let me know if you're interested in trying it out.
>>>> > > >> >
>>>> > > >> > FYI: the sticky container deployment will also minimize the
>>>> time to
>>>> > > >> upgrade
>>>> > > >> > / deploy since majority of the upgrade time is taken up by the
>>>> > > >> container in
>>>> > > >> > reading all the changelog (if any). Upgrade / re-deploy will
>>>> also
>>>> > > take a
>>>> > > >> > long time if the checkpoint topic is not log compacted (which
>>>> is
>>>> > true
>>>> > > in
>>>> > > >> > our environment).
>>>> > > >> >
>>>> > > >> > Thanks,
>>>> > > >> > C
>>>> > > >> >
>>>> > > >> > On Wed, Jan 6, 2016 at 9:56 AM, Bae, Jae Hyeon <
>>>> metac...@gmail.com>
>>>> > > >> wrote:
>>>> > > >> >
>>>> > > >> > > Hi Samza devs and users
>>>> > > >> > >
>>>> > > >> > > I know this will be tricky in Samza because Samza Kafka
>>>> consumer
>>>> > is
>>>> > > >> not
>>>> > > >> > > coordinated externally, but do you have any idea how to
>>>> deploy
>>>> > samza
>>>> > > >> jobs
>>>> > > >> > > with zero downtime?
>>>> > > >> > >
>>>> > > >> > > Thank you
>>>> > > >> > > Best, Jae
>>>> > > >> > >
>>>> > > >> >
>>>> > > >> >
>>>> > > >> >
>>>> > > >> > --
>>>> > > >> > Thanks and regards
>>>> > > >> >
>>>> > > >> > Chinmay Soman
>>>> > > >> >
>>>> > > >>
>>>> > > >
>>>> > > >
>>>> > > >
>>>> > > > --
>>>> > > > Thanks and regards
>>>> > > >
>>>> > > > Chinmay Soman
>>>> > > >
>>>> > >
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Thanks and regards
>>>> >
>>>> > Chinmay Soman
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks and regards
>>>
>>> Chinmay Soman
>>>
>>
>>
>

Reply via email to