Hi Jeremy, Samza already has a checkpoint stream, which records the latest-processed offset. The new-job can reuse old-job's checkpoint stream.
Thanks, Fang, Yan yanfang...@gmail.com On Thu, Apr 16, 2015 at 1:51 PM, jeremy p <athomewithagroove...@gmail.com> wrote: > Thank you for the response. Does this mean the Old-Rules-Job would need to > maintain a Last-Processed-Old-Rules offset for each partition? > > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black <b...@b3k.us> wrote: > > > Offsets are per partition. The alternative would have poor scaling > behavior > > for both brokers and consumers. > > > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p < > athomewithagroove...@gmail.com> > > wrote: > > > > > Thanks to everybody for the responses! > > > > > > Yi : The queue must be processed in order, which means that I cannot > use > > > Ben and Guozhang's approach. > > > > > > However, it is not necessary that all rules be processed at the same > > offset > > > and at the same speed. This is why I considered a solution where we > had > > a > > > separate job for each rule. The problem with that solution is that we > > > could have thousands of these rules, which would mean thousands of > jobs. > > > These jobs would be really lightweight and would require very few > system > > > resources. However, I don't know if having thousands of jobs would > break > > > YARN. > > > > > > For now, it sounds like Yan's solution would be the best. However, I > > have a > > > few questions about it. For now, let's call the original job the > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the > > > solution, as I understand it : > > > > > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset. We > start > > > the All-Rules-Job. The All-Rules-Job will only apply new rules until > it > > > gets to the Last-Processed-Old-Rules offset. Once the All-Rules-Job > gets > > > to the Last-Processed-Old-Rules offset, it sends a kill signal to > > > Old-Rules-Job along a control stream. Old-Rules-Job terminates itself. > > > Then the All-Rules-Job applies both old and new rules to every message > > that > > > comes in. > > > > > > My questions : > > > > > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every > > > time it processes a message? How does the Old-Rules-Job expose the > > > Last-Processed-Rules offset to the All-Rules-Job? Would the > > > Last-Processed-Rules offset be the absolute offset within a topic, and > > not > > > the offset within a partition? Is there a way to find out a message's > > > absolute offset within a topic? > > > > > > Thanks again for all the help! > > > > > > --Jeremy > > > > > > > > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > > > > > Hi, Jeremy, > > > > > > > > I saw the following requirements from your use case: > > > > > > > > 1) New rules need to be dynamically added w/ creating too many Samza > > jobs > > > > (e.g. 1 Samza job per new rule is too much) > > > > 2) Old rules need to continue processing when new rules are added > > > > > > > > I want to ask a few more questions regarding to your requirements: > > > > > > > > Q.1) Is it required that for a new rule, the bootstrap processing of > > > > messages from offset 0 to Last-Processed-Old-Rules has to be done > > before > > > > the new rules can be applied to messages from offset > > > > Last-Processed-Old-Rules? > > > > Q.2) Is it required that after bootstrap, all rules are processing > the > > > > message at the same offset w/ the same speed? > > > > > > > > If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we > > > will > > > > have to slow down or stop the jobs for the old rules s.t. the jobs > > > running > > > > both new and old rules can catch up, as Yan pointed out. If answers > to > > > both > > > > questions above are no (which I doubt since you need to build-up > > certain > > > > "history" for the new rule before you can apply it to later > messages), > > > you > > > > can take Ben/Guozhang's approach w/o coordination between the two > jobs. > > > > > > > > Now the interesting case is that your answer to Q.1 is yes, and to > Q.2 > > is > > > > no, which essentially post a request that your job will need to keep > > > > multiple independent consumer offsets per rule and let them move w/ > > their > > > > own speed. Or, at least one bootstrap consumer, and one normal > > processing > > > > consumer on the same system stream partition within a single job. I > > don't > > > > think that Samza support this now. And the only work around is Yan's > > > > solution which requires coordination between two jobs. > > > > > > > > -Yi > > > > > > > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang <yanfang...@gmail.com> > > wrote: > > > > > > > > > you are able to call coordinator.shutdown to shut the job down > after > > it > > > > > reaches the offset. > > > > > > > > > > Thanks, > > > > > > > > > > Fang, Yan > > > > > yanfang...@gmail.com > > > > > > > > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <wangg...@gmail.com > > > > > > wrote: > > > > > > > > > > > I feel Ben's solution a bit simpler that you just need to restart > > > your > > > > > > current job with both rules on the check pointed offset, and > start > > a > > > > new > > > > > > job from offset 0 with only the new rule and it will stop at the > > > > checkout > > > > > > pointed offset. But of course it requires the second job to be > able > > > to > > > > > > shutdown itself upon some specific offset which I am not sure if > it > > > is > > > > > > already supported. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <yanfang...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hi Jeremy, > > > > > > > > > > > > > > In order to reach this goal, we have to assume that the job > with > > > new > > > > > > rules > > > > > > > can always catch up with the one with old rules. Otherwise, I > > think > > > > we > > > > > do > > > > > > > not have the choice but running a lot of jobs simultaneously. > > > > > > > > > > > > > > Under our assumption, we have job1 with old rules running, and > > now > > > > add > > > > > > job2 > > > > > > > which integrates old rules and new rules to run. Job2 > frequently > > > > > > > checks the Last-Processed-Old-Rules > > > > > > > offset from job1 (because job1 is running too), and it only > > applies > > > > new > > > > > > > rule to the data until catch up with the > Last-Processed-Old-Rules > > > > > offset. > > > > > > > Then it sends signal to the job1 and shutdown job1, and applies > > all > > > > > rules > > > > > > > to the stream. > > > > > > > > > > > > > > In terms of how to shutdown the job1, here is one solution > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E > > > > > > > > > > > > > > > provided by Chris - e.g. you can have a control stream to get > > job1 > > > > > > > shutdown. Samza will provide this kind of stream after > SAMZA-348 > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-348>, which is > > under > > > > > active > > > > > > > development. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Fang, Yan > > > > > > > yanfang...@gmail.com > > > > > > > > > > > > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p < > > > > > > athomewithagroove...@gmail.com > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hello Yan, > > > > > > > > > > > > > > > > Thank you for the suggestion! I think your solution would > > work, > > > > > > > however, I > > > > > > > > am afraid it would create a performance problem for our > users. > > > > > > > > > > > > > > > > Let's say we kill the Classifier task, and create a new > > > Classifier > > > > > task > > > > > > > > with both the existing rules and new rules. We get the offset > > of > > > > the > > > > > > > > latest-processed message for the old rules. Let's call this > > > offset > > > > > > > > Last-Processed-Old-Rules. We ignore messages > > > > > > > > before Last-Processed-Old-Rules for the old rules. We > > configure > > > > the > > > > > > new > > > > > > > > Classifier task to be a bootstrap task. > > > > > > > > > > > > > > > > Let's say we have users who are watching the output topics, > and > > > > they > > > > > > are > > > > > > > > expecting near-realtime updates. They won't see any updates > > for > > > > the > > > > > > old > > > > > > > > rules until our task has passed the Last-Processed-Old-Rules > > > > offset. > > > > > > If > > > > > > > we > > > > > > > > have a lot of messages in that topic, that could take a long > > > time. > > > > > > This > > > > > > > is > > > > > > > > why I was hoping there would be a way to bootstrap the new > > rules > > > > > while > > > > > > > > we're still processing the old rules. Do you think there is > a > > > way > > > > to > > > > > > do > > > > > > > > that? > > > > > > > > > > > > > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang < > > yanfang...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Jeremy, > > > > > > > > > > > > > > > > > > If my understanding is correct, whenever you add a new > rule, > > > you > > > > > want > > > > > > > to > > > > > > > > > apply this rule to the historical data. Right? > > > > > > > > > > > > > > > > > > If you do not care about duplication, you can create a new > > task > > > > > that > > > > > > > > > contains existing rules and new rules. Configure bootstrap. > > > This > > > > > will > > > > > > > > apply > > > > > > > > > all the rules from the beginning of the input stream. The > > > > > shortcoming > > > > > > > is > > > > > > > > > you will get duplicated results for old rules. > > > > > > > > > > > > > > > > > > If you can not tolerate the shortcoming, 1) get the offset > of > > > the > > > > > > > > > latest-processed message of old rules. 2) In your new task, > > > > ignore > > > > > > > > messages > > > > > > > > > before that offset for the old rules. 3) bootstrap. > > > > > > > > > > > > > > > > > > Hope this helps. Maybe your use case is more complicated? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Fang, Yan > > > > > > > > > yanfang...@gmail.com > > > > > > > > > > > > > > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p < > > > > > > > > athomewithagroove...@gmail.com > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > So, I'm wanting to use Samza for a project I'm working > on, > > > but > > > > I > > > > > > keep > > > > > > > > > > running into a problem with bootstrapping. > > > > > > > > > > > > > > > > > > > > Let's say there's a Kafka topic called Numbers that I > want > > to > > > > > > consume > > > > > > > > > with > > > > > > > > > > Samza. Let's say each message has a single integer in > it, > > > and > > > > I > > > > > > want > > > > > > > > to > > > > > > > > > > classify it as even or odd. So I have two topics that > I'm > > > > using > > > > > > for > > > > > > > > > > output, one called Even and one called Odd. I write a > > simple > > > > > > stream > > > > > > > > task > > > > > > > > > > called Classifier that consumes the Numbers topic, > examines > > > > each > > > > > > > > incoming > > > > > > > > > > integer and writes it back out to Even or Odd. > > > > > > > > > > > > > > > > > > > > Now, let's say I want to be able to add classifications > > > > > > dynamically, > > > > > > > > > like : > > > > > > > > > > "divisible by three", "divisible by four", or "numbers > that > > > > > appear > > > > > > in > > > > > > > > my > > > > > > > > > > date of birth". And let's say I have an API I can query > > that > > > > > gives > > > > > > > me > > > > > > > > > all > > > > > > > > > > the assignment rules, such as "when a number is divisble > by > > > 3, > > > > > > write > > > > > > > it > > > > > > > > > out > > > > > > > > > > to a topic called 'divisible_by_three'", or "when a > number > > > > > appears > > > > > > in > > > > > > > > the > > > > > > > > > > string 12/12/1981, write it to the 'my_birthday' topic". > > So > > > > now > > > > > I > > > > > > > > > rewrite > > > > > > > > > > my stream task to query this API for assignment rules. > It > > > > reads > > > > > > > > integers > > > > > > > > > > from the Numbers topic and writes them back out to one or > > > more > > > > > > output > > > > > > > > > > topics, according to the assignment rules. > > > > > > > > > > > > > > > > > > > > Now, let's make this even more complicated. When I add a > > new > > > > > > > > > > classification, I want to go back to the very beginning > of > > > the > > > > > > > Numbers > > > > > > > > > > topic and classify them accordingly. Once we've consumed > > all > > > > the > > > > > > old > > > > > > > > > > "historical" integers, I want to apply this > classification > > > new > > > > > > > integers > > > > > > > > > as > > > > > > > > > > they come in. > > > > > > > > > > > > > > > > > > > > And this is where I get stuck. > > > > > > > > > > > > > > > > > > > > One thing I can do : when I want to add a new > > > classification, I > > > > > can > > > > > > > > > create > > > > > > > > > > a bootstrap job by setting the > > > > > > > > > > "systems.kafka.streams.numbers.samza.offset.default" > > property > > > > to > > > > > > > > > "oldest". > > > > > > > > > > And that's great, but the problem is, once I've "caught > > up", > > > > I'd > > > > > > like > > > > > > > > to > > > > > > > > > > kill the bootstrap job and just let the Classifier handle > > > this > > > > > new > > > > > > > > > > assignment. So, I'd want to do some kind of handover > from > > > the > > > > > > > > bootstrap > > > > > > > > > > job to the Classifier job. But how to do this? > > > > > > > > > > > > > > > > > > > > So, the question I must ask is this : Is Samza even an > > > > appopriate > > > > > > way > > > > > > > > to > > > > > > > > > > solve this problem? Has this problem ever come up for > > > anybody > > > > > > else? > > > > > > > > How > > > > > > > > > > have they solved it? I would really like to use Samza > > > because > > > > it > > > > > > > seems > > > > > > > > > > like an appopriate technology, and I'd really really > really > > > > > really > > > > > > > like > > > > > > > > > to > > > > > > > > > > avoid re-inventing the wheel. > > > > > > > > > > > > > > > > > > > > A couple solutions I came up with : > > > > > > > > > > > > > > > > > > > > 1) The simple solution. Have a separate Samza job for > each > > > > > > > > > > classification. If I want to add a new classification, I > > > > create > > > > > a > > > > > > > new > > > > > > > > > job > > > > > > > > > > and set it up as a bootstrap job. This would solve the > > > > problem. > > > > > > > > > However, > > > > > > > > > > we may want to have many, many classifications. It could > > be > > > as > > > > > > many > > > > > > > as > > > > > > > > > > 1,000,000, which would mean up to 1,000,000 > simultaneously > > > > > running > > > > > > > > jobs. > > > > > > > > > > This could create a lot of overhead for YARN and Kafka. > > > > > > > > > > > > > > > > > > > > 2) My overly-complicated workaround solution. Each > > > assignment > > > > > rule > > > > > > > has > > > > > > > > > an > > > > > > > > > > "isnew" flag. If it's a new classification that hasn't > > fully > > > > > > > > > bootstrapped > > > > > > > > > > yet, the "isnew" flag is set to TRUE. When my classifier > > > > queries > > > > > > the > > > > > > > > API > > > > > > > > > > for assignment rules, it ignores any rule with an "isnew" > > > flag. > > > > > > > When I > > > > > > > > > > want to add a new classification, I create a new > bootstrap > > > job > > > > > for > > > > > > > that > > > > > > > > > > classification. Every so often, maybe every few days or > > so, > > > if > > > > > all > > > > > > > of > > > > > > > > my > > > > > > > > > > bootstrap jobs have "caught up", I kill all of the > > bootstrap > > > > jobs > > > > > > and > > > > > > > > > > classifier jobs. I set all the "isnew" flags to FALSE. > > > Then I > > > > > > > restart > > > > > > > > > the > > > > > > > > > > classifier job. This is kind of an ugly solution, and > I'm > > > not > > > > > even > > > > > > > > sure > > > > > > > > > it > > > > > > > > > > would work. For one thing, I'd need some way of knowing > > if a > > > > > > > boostrap > > > > > > > > > job > > > > > > > > > > has "caught up". Secondly, I'd essentially be restarting > > the > > > > > > > > classifier > > > > > > > > > > job periodically, which just seems like an ugly solution. > > I > > > > > don't > > > > > > > like > > > > > > > > > it. > > > > > > > > > > > > > > > > > > > > 3) Some other kind of really complicated solution I > haven't > > > > > thought > > > > > > > of > > > > > > > > > yet, > > > > > > > > > > probably involving locks, transactions, concurrancy, and > > > > > > interprocess > > > > > > > > > > communication. > > > > > > > > > > > > > > > > > > > > Thanks for reading this whole thing. Please let me know > if > > > you > > > > > > have > > > > > > > > any > > > > > > > > > > suggestions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > >