Yan : It sounds like the checkpoint stream might help me!  I would like to
learn more about how New-Rules-Job can access the checkpoint stream for
Old-Rules-Job.  Can you please give me an example of how I would do this?
Or could you please point me to some documentation or an article where I
can learn how to do this?

Thank you!

On Thu, Apr 16, 2015 at 5:06 PM, jeremy p <athomewithagroove...@gmail.com>
wrote:

> Ben : I think we are talking about different things here.  I'm not trying
> to maintain ordering across a topic.  I know that is not what Kafka and
> Samza are meant for.  What I'm trying to do here is give my Old-Rules-Job a
> way of telling New-Rules-Job, "Once you hit this offset, start applying
> both old and new rules."  So is that a single absolute offset that I want
> to pass from Old-Rules-Job to New-Rules-Job?  Or a set of offsets, one for
> each partition.
>
> On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black <b...@b3k.us> wrote:
>
>> If you need to maintain ordering of a sequence of messages, those messages
>> should all be written to the same partition. If you are concerned with
>> global ordering of all messages in a topic then kafka is likely not going
>> to be what you want. Ordering guarantees are strictly per partition. samza
>> is built on this principle by having a tasks work from a single partition.
>> If your jobs require global coordination between tasks, again, you might
>> reconsider either your architecture or your use of kafka.
>>
>> Not trying to harsh your mellow here. High scale systems like kafka
>> require
>> you match your architecture to them. To do otherwise produces bad times.
>>
>> On Thu, Apr 16, 2015 at 1:51 PM, jeremy p <athomewithagroove...@gmail.com
>> >
>> wrote:
>>
>> > Thank you for the response.  Does this mean the Old-Rules-Job would
>> need to
>> > maintain a Last-Processed-Old-Rules offset for each partition?
>> >
>> > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black <b...@b3k.us> wrote:
>> >
>> > > Offsets are per partition. The alternative would have poor scaling
>> > behavior
>> > > for both brokers and consumers.
>> > >
>> > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
>> > athomewithagroove...@gmail.com>
>> > > wrote:
>> > >
>> > > > Thanks to everybody for the responses!
>> > > >
>> > > > Yi : The queue must be processed in order, which means that I cannot
>> > use
>> > > > Ben and Guozhang's approach.
>> > > >
>> > > > However, it is not necessary that all rules be processed at the same
>> > > offset
>> > > > and at the same speed.  This is why I considered a solution where we
>> > had
>> > > a
>> > > > separate job for each rule.  The problem with that solution is that
>> we
>> > > > could have thousands of these rules, which would mean thousands of
>> > jobs.
>> > > > These jobs would be really lightweight and would require very few
>> > system
>> > > > resources.  However, I don't know if having thousands of jobs would
>> > break
>> > > > YARN.
>> > > >
>> > > > For now, it sounds like Yan's solution would be the best. However, I
>> > > have a
>> > > > few questions about it.  For now, let's call the original job the
>> > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
>> > > > solution, as I understand it :
>> > > >
>> > > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We
>> > start
>> > > > the All-Rules-Job.  The All-Rules-Job will only apply new rules
>> until
>> > it
>> > > > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job
>> > gets
>> > > > to the Last-Processed-Old-Rules offset, it sends a kill signal to
>> > > > Old-Rules-Job along a control stream.  Old-Rules-Job terminates
>> itself.
>> > > > Then the All-Rules-Job applies both old and new rules to every
>> message
>> > > that
>> > > > comes in.
>> > > >
>> > > > My questions :
>> > > >
>> > > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset
>> every
>> > > > time it processes a message?  How does the Old-Rules-Job expose the
>> > > > Last-Processed-Rules offset to the All-Rules-Job?  Would the
>> > > > Last-Processed-Rules offset be the absolute offset within a topic,
>> and
>> > > not
>> > > > the offset within a partition?  Is there a way to find out a
>> message's
>> > > > absolute offset within a topic?
>> > > >
>> > > > Thanks again for all the help!
>> > > >
>> > > > --Jeremy
>> > > >
>> > > >
>> > > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan <nickpa...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi, Jeremy,
>> > > > >
>> > > > > I saw the following requirements from your use case:
>> > > > >
>> > > > > 1) New rules need to be dynamically added w/ creating too many
>> Samza
>> > > jobs
>> > > > > (e.g. 1 Samza job per new rule is too much)
>> > > > > 2) Old rules need to continue processing when new rules are added
>> > > > >
>> > > > > I want to ask a few more questions regarding to your requirements:
>> > > > >
>> > > > > Q.1) Is it required that for a new rule, the bootstrap processing
>> of
>> > > > > messages from offset 0 to Last-Processed-Old-Rules has to be done
>> > > before
>> > > > > the new rules can be applied to messages from offset
>> > > > > Last-Processed-Old-Rules?
>> > > > > Q.2) Is it required that after bootstrap, all rules are processing
>> > the
>> > > > > message at the same offset w/ the same speed?
>> > > > >
>> > > > > If the answers to both questions (i.e. Q.1 and Q.2) above are
>> yes, we
>> > > > will
>> > > > > have to slow down or stop the jobs for the old rules s.t. the jobs
>> > > > running
>> > > > > both new and old rules can catch up, as Yan pointed out. If
>> answers
>> > to
>> > > > both
>> > > > > questions above are no (which I doubt since you need to build-up
>> > > certain
>> > > > > "history" for the new rule before you can apply it to later
>> > messages),
>> > > > you
>> > > > > can take Ben/Guozhang's approach w/o coordination between the two
>> > jobs.
>> > > > >
>> > > > > Now the interesting case is that your answer to Q.1 is yes, and to
>> > Q.2
>> > > is
>> > > > > no, which essentially post a request that your job will need to
>> keep
>> > > > > multiple independent consumer offsets per rule and let them move
>> w/
>> > > their
>> > > > > own speed. Or, at least one bootstrap consumer, and one normal
>> > > processing
>> > > > > consumer on the same system stream partition within a single job.
>> I
>> > > don't
>> > > > > think that Samza support this now. And the only work around is
>> Yan's
>> > > > > solution which requires coordination between two jobs.
>> > > > >
>> > > > > -Yi
>> > > > >
>> > > > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang <yanfang...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > you are able to call coordinator.shutdown to shut the job down
>> > after
>> > > it
>> > > > > > reaches the offset.
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Fang, Yan
>> > > > > > yanfang...@gmail.com
>> > > > > >
>> > > > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <
>> wangg...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > I feel Ben's solution a bit simpler that you just need to
>> restart
>> > > > your
>> > > > > > > current job with both rules on the check pointed offset, and
>> > start
>> > > a
>> > > > > new
>> > > > > > > job from offset 0 with only the new rule and it will stop at
>> the
>> > > > > checkout
>> > > > > > > pointed offset. But of course it requires the second job to be
>> > able
>> > > > to
>> > > > > > > shutdown itself upon some specific offset which I am not sure
>> if
>> > it
>> > > > is
>> > > > > > > already supported.
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <
>> yanfang...@gmail.com>
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Jeremy,
>> > > > > > > >
>> > > > > > > > In order to reach this goal, we have to assume that the job
>> > with
>> > > > new
>> > > > > > > rules
>> > > > > > > > can always catch up with the one with old rules. Otherwise,
>> I
>> > > think
>> > > > > we
>> > > > > > do
>> > > > > > > > not have the choice but running a lot of jobs
>> simultaneously.
>> > > > > > > >
>> > > > > > > > Under our assumption, we have job1 with old rules running,
>> and
>> > > now
>> > > > > add
>> > > > > > > job2
>> > > > > > > > which integrates old rules and new rules to run. Job2
>> > frequently
>> > > > > > > > checks the Last-Processed-Old-Rules
>> > > > > > > > offset from job1 (because job1 is running too), and it only
>> > > applies
>> > > > > new
>> > > > > > > > rule to the data until catch up with the
>> > Last-Processed-Old-Rules
>> > > > > > offset.
>> > > > > > > > Then it sends signal to the job1 and shutdown job1, and
>> applies
>> > > all
>> > > > > > rules
>> > > > > > > > to the stream.
>> > > > > > > >
>> > > > > > > > In terms of how to shutdown the job1, here is one solution
>> > > > > > > > <
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E
>> > > > > > > > >
>> > > > > > > > provided by Chris - e.g. you can have a control stream to
>> get
>> > > job1
>> > > > > > > > shutdown. Samza will provide this kind of stream after
>> > SAMZA-348
>> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-348>, which is
>> > > under
>> > > > > > active
>> > > > > > > > development.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > >
>> > > > > > > > Fang, Yan
>> > > > > > > > yanfang...@gmail.com
>> > > > > > > >
>> > > > > > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
>> > > > > > > athomewithagroove...@gmail.com
>> > > > > > > > >
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hello Yan,
>> > > > > > > > >
>> > > > > > > > > Thank you for the suggestion!  I think your solution would
>> > > work,
>> > > > > > > > however, I
>> > > > > > > > > am afraid it would create a performance problem for our
>> > users.
>> > > > > > > > >
>> > > > > > > > > Let's say we kill the Classifier task, and create a new
>> > > > Classifier
>> > > > > > task
>> > > > > > > > > with both the existing rules and new rules. We get the
>> offset
>> > > of
>> > > > > the
>> > > > > > > > > latest-processed message for the old rules.  Let's call
>> this
>> > > > offset
>> > > > > > > > > Last-Processed-Old-Rules.  We ignore messages
>> > > > > > > > > before Last-Processed-Old-Rules for the old rules.  We
>> > > configure
>> > > > > the
>> > > > > > > new
>> > > > > > > > > Classifier task to be a bootstrap task.
>> > > > > > > > >
>> > > > > > > > > Let's say we have users who are watching the output
>> topics,
>> > and
>> > > > > they
>> > > > > > > are
>> > > > > > > > > expecting near-realtime updates.  They won't see any
>> updates
>> > > for
>> > > > > the
>> > > > > > > old
>> > > > > > > > > rules until our task has passed the
>> Last-Processed-Old-Rules
>> > > > > offset.
>> > > > > > > If
>> > > > > > > > we
>> > > > > > > > > have a lot of messages in that topic, that could take a
>> long
>> > > > time.
>> > > > > > > This
>> > > > > > > > is
>> > > > > > > > > why I was hoping there would be a way to bootstrap the new
>> > > rules
>> > > > > > while
>> > > > > > > > > we're still processing the old rules.  Do you think there
>> is
>> > a
>> > > > way
>> > > > > to
>> > > > > > > do
>> > > > > > > > > that?
>> > > > > > > > >
>> > > > > > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <
>> > > yanfang...@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Jeremy,
>> > > > > > > > > >
>> > > > > > > > > > If my understanding is correct, whenever you add a new
>> > rule,
>> > > > you
>> > > > > > want
>> > > > > > > > to
>> > > > > > > > > > apply this rule to the historical data. Right?
>> > > > > > > > > >
>> > > > > > > > > > If you do not care about duplication, you can create a
>> new
>> > > task
>> > > > > > that
>> > > > > > > > > > contains existing rules and new rules. Configure
>> bootstrap.
>> > > > This
>> > > > > > will
>> > > > > > > > > apply
>> > > > > > > > > > all the rules from the beginning of the input stream.
>> The
>> > > > > > shortcoming
>> > > > > > > > is
>> > > > > > > > > > you will get duplicated results for old rules.
>> > > > > > > > > >
>> > > > > > > > > > If you can not tolerate the shortcoming, 1) get the
>> offset
>> > of
>> > > > the
>> > > > > > > > > > latest-processed message of old rules. 2) In your new
>> task,
>> > > > > ignore
>> > > > > > > > > messages
>> > > > > > > > > > before that offset for the old rules. 3) bootstrap.
>> > > > > > > > > >
>> > > > > > > > > > Hope this helps. Maybe your use case is more
>> complicated?
>> > > > > > > > > >
>> > > > > > > > > > Thanks,
>> > > > > > > > > >
>> > > > > > > > > > Fang, Yan
>> > > > > > > > > > yanfang...@gmail.com
>> > > > > > > > > >
>> > > > > > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
>> > > > > > > > > athomewithagroove...@gmail.com
>> > > > > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > So, I'm wanting to use Samza for a project I'm working
>> > on,
>> > > > but
>> > > > > I
>> > > > > > > keep
>> > > > > > > > > > > running into a problem with bootstrapping.
>> > > > > > > > > > >
>> > > > > > > > > > > Let's say there's a Kafka topic called Numbers that I
>> > want
>> > > to
>> > > > > > > consume
>> > > > > > > > > > with
>> > > > > > > > > > > Samza.  Let's say each message has a single integer in
>> > it,
>> > > > and
>> > > > > I
>> > > > > > > want
>> > > > > > > > > to
>> > > > > > > > > > > classify it as even or odd.  So I have two topics that
>> > I'm
>> > > > > using
>> > > > > > > for
>> > > > > > > > > > > output, one called Even and one called Odd.  I write a
>> > > simple
>> > > > > > > stream
>> > > > > > > > > task
>> > > > > > > > > > > called Classifier that consumes the Numbers topic,
>> > examines
>> > > > > each
>> > > > > > > > > incoming
>> > > > > > > > > > > integer and writes it back out to Even or Odd.
>> > > > > > > > > > >
>> > > > > > > > > > > Now, let's say I want to be able to add
>> classifications
>> > > > > > > dynamically,
>> > > > > > > > > > like :
>> > > > > > > > > > > "divisible by three", "divisible by four", or "numbers
>> > that
>> > > > > > appear
>> > > > > > > in
>> > > > > > > > > my
>> > > > > > > > > > > date of birth".  And let's say I have an API I can
>> query
>> > > that
>> > > > > > gives
>> > > > > > > > me
>> > > > > > > > > > all
>> > > > > > > > > > > the assignment rules, such as "when a number is
>> divisble
>> > by
>> > > > 3,
>> > > > > > > write
>> > > > > > > > it
>> > > > > > > > > > out
>> > > > > > > > > > > to a topic called 'divisible_by_three'", or "when a
>> > number
>> > > > > > appears
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > > string 12/12/1981, write it to the 'my_birthday'
>> topic".
>> > > So
>> > > > > now
>> > > > > > I
>> > > > > > > > > > rewrite
>> > > > > > > > > > > my stream task to query this API for assignment rules.
>> > It
>> > > > > reads
>> > > > > > > > > integers
>> > > > > > > > > > > from the Numbers topic and writes them back out to
>> one or
>> > > > more
>> > > > > > > output
>> > > > > > > > > > > topics, according to the assignment rules.
>> > > > > > > > > > >
>> > > > > > > > > > > Now, let's make this even more complicated.  When I
>> add a
>> > > new
>> > > > > > > > > > > classification, I want to go back to the very
>> beginning
>> > of
>> > > > the
>> > > > > > > > Numbers
>> > > > > > > > > > > topic and classify them accordingly.  Once we've
>> consumed
>> > > all
>> > > > > the
>> > > > > > > old
>> > > > > > > > > > > "historical" integers, I want to apply this
>> > classification
>> > > > new
>> > > > > > > > integers
>> > > > > > > > > > as
>> > > > > > > > > > > they come in.
>> > > > > > > > > > >
>> > > > > > > > > > > And this is where I get stuck.
>> > > > > > > > > > >
>> > > > > > > > > > > One thing I can do : when I want to add a new
>> > > > classification, I
>> > > > > > can
>> > > > > > > > > > create
>> > > > > > > > > > > a bootstrap job by setting the
>> > > > > > > > > > > "systems.kafka.streams.numbers.samza.offset.default"
>> > > property
>> > > > > to
>> > > > > > > > > > "oldest".
>> > > > > > > > > > > And that's great, but the problem is, once I've
>> "caught
>> > > up",
>> > > > > I'd
>> > > > > > > like
>> > > > > > > > > to
>> > > > > > > > > > > kill the bootstrap job and just let the Classifier
>> handle
>> > > > this
>> > > > > > new
>> > > > > > > > > > > assignment.  So, I'd want to do some kind of handover
>> > from
>> > > > the
>> > > > > > > > > bootstrap
>> > > > > > > > > > > job to the Classifier job.  But how to do this?
>> > > > > > > > > > >
>> > > > > > > > > > > So, the question I must ask is this : Is Samza even an
>> > > > > appopriate
>> > > > > > > way
>> > > > > > > > > to
>> > > > > > > > > > > solve this problem?  Has this problem ever come up for
>> > > > anybody
>> > > > > > > else?
>> > > > > > > > > How
>> > > > > > > > > > > have they solved it?  I would really like to use Samza
>> > > > because
>> > > > > it
>> > > > > > > > seems
>> > > > > > > > > > > like an appopriate technology, and I'd really really
>> > really
>> > > > > > really
>> > > > > > > > like
>> > > > > > > > > > to
>> > > > > > > > > > > avoid re-inventing the wheel.
>> > > > > > > > > > >
>> > > > > > > > > > > A couple solutions I came up with :
>> > > > > > > > > > >
>> > > > > > > > > > > 1) The simple solution.  Have a separate Samza job for
>> > each
>> > > > > > > > > > > classification.  If I want to add a new
>> classification, I
>> > > > > create
>> > > > > > a
>> > > > > > > > new
>> > > > > > > > > > job
>> > > > > > > > > > > and set it up as a bootstrap job.  This would solve
>> the
>> > > > > problem.
>> > > > > > > > > > However,
>> > > > > > > > > > > we may want to have many, many classifications.  It
>> could
>> > > be
>> > > > as
>> > > > > > > many
>> > > > > > > > as
>> > > > > > > > > > > 1,000,000, which would mean up to 1,000,000
>> > simultaneously
>> > > > > > running
>> > > > > > > > > jobs.
>> > > > > > > > > > > This could create a lot of overhead for YARN and
>> Kafka.
>> > > > > > > > > > >
>> > > > > > > > > > > 2) My overly-complicated workaround solution.  Each
>> > > > assignment
>> > > > > > rule
>> > > > > > > > has
>> > > > > > > > > > an
>> > > > > > > > > > > "isnew" flag.  If it's a new classification that
>> hasn't
>> > > fully
>> > > > > > > > > > bootstrapped
>> > > > > > > > > > > yet, the "isnew" flag is set to TRUE.  When my
>> classifier
>> > > > > queries
>> > > > > > > the
>> > > > > > > > > API
>> > > > > > > > > > > for assignment rules, it ignores any rule with an
>> "isnew"
>> > > > flag.
>> > > > > > > > When I
>> > > > > > > > > > > want to add a new classification, I create a new
>> > bootstrap
>> > > > job
>> > > > > > for
>> > > > > > > > that
>> > > > > > > > > > > classification.  Every so often, maybe every few days
>> or
>> > > so,
>> > > > if
>> > > > > > all
>> > > > > > > > of
>> > > > > > > > > my
>> > > > > > > > > > > bootstrap jobs have "caught up", I kill all of the
>> > > bootstrap
>> > > > > jobs
>> > > > > > > and
>> > > > > > > > > > > classifier jobs.  I set all the "isnew" flags to
>> FALSE.
>> > > > Then I
>> > > > > > > > restart
>> > > > > > > > > > the
>> > > > > > > > > > > classifier job.  This is kind of an ugly solution, and
>> > I'm
>> > > > not
>> > > > > > even
>> > > > > > > > > sure
>> > > > > > > > > > it
>> > > > > > > > > > > would work.  For one thing, I'd need some way of
>> knowing
>> > > if a
>> > > > > > > > boostrap
>> > > > > > > > > > job
>> > > > > > > > > > > has "caught up".  Secondly, I'd essentially be
>> restarting
>> > > the
>> > > > > > > > > classifier
>> > > > > > > > > > > job periodically, which just seems like an ugly
>> solution.
>> > > I
>> > > > > > don't
>> > > > > > > > like
>> > > > > > > > > > it.
>> > > > > > > > > > >
>> > > > > > > > > > > 3) Some other kind of really complicated solution I
>> > haven't
>> > > > > > thought
>> > > > > > > > of
>> > > > > > > > > > yet,
>> > > > > > > > > > > probably involving locks, transactions, concurrancy,
>> and
>> > > > > > > interprocess
>> > > > > > > > > > > communication.
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks for reading this whole thing.  Please let me
>> know
>> > if
>> > > > you
>> > > > > > > have
>> > > > > > > > > any
>> > > > > > > > > > > suggestions.
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to