Hi Jeremy, Benjamin is right, New-Rules-Job will need to know the map of partitions to offsets. Samza's checkpoint stream has the mapping. The doc is here <http://samza.apache.org/learn/documentation/0.9/container/checkpointing.html> .
However, after my second thought, I do not recommend to use the default checkpoint stream because 1) it was initially designed for restarting job. not very friendly for other usage -- a lot of mapping and config stuff involved. You can check CheckpointTool <https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala> to get a feel how to read that stream 2) This is an important point. your code will be incompatible with newer Samza -- after SAMZA-465 <https://issues.apache.org/jira/browse/SAMZA-465> , there will be a coordinator-stream, no checkpoint stream. Even though you may not use the system's checkpoint stream, you can easily create and send the latest offset to simple-checkpoint stream. The mapping problem raised by Benjamin is still solvable. For example, your input stream Number has 10 partitions, you can write to a 10-partition simple-checkpoint stream. Simple-checkpoint stream's partition number is always the same as that of Number's. So when you process partition #1 of Number, you will write to partition #1 of simple-checkpoint stream. When you bring up the New-Rules-Job, it accepts two streams: Number and simple-checkpoint stream. The latter has the latest offset of the Old-Rules-Job. Just need to guarantee the same partition # of Number and simple-checkpoint stream goes to the same container. By default, it does. Thanks, Fang, Yan yanfang...@gmail.com On Thu, Apr 16, 2015 at 2:16 PM, Benjamin Black <b...@b3k.us> wrote: > New-Rules-Job will need to know the complete map of partitions to offsets. > > On Thu, Apr 16, 2015 at 2:06 PM, jeremy p <athomewithagroove...@gmail.com> > wrote: > > > Ben : I think we are talking about different things here. I'm not trying > > to maintain ordering across a topic. I know that is not what Kafka and > > Samza are meant for. What I'm trying to do here is give my > Old-Rules-Job a > > way of telling New-Rules-Job, "Once you hit this offset, start applying > > both old and new rules." So is that a single absolute offset that I want > > to pass from Old-Rules-Job to New-Rules-Job? Or a set of offsets, one > for > > each partition. > > > > On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black <b...@b3k.us> wrote: > > > > > If you need to maintain ordering of a sequence of messages, those > > messages > > > should all be written to the same partition. If you are concerned with > > > global ordering of all messages in a topic then kafka is likely not > going > > > to be what you want. Ordering guarantees are strictly per partition. > > samza > > > is built on this principle by having a tasks work from a single > > partition. > > > If your jobs require global coordination between tasks, again, you > might > > > reconsider either your architecture or your use of kafka. > > > > > > Not trying to harsh your mellow here. High scale systems like kafka > > require > > > you match your architecture to them. To do otherwise produces bad > times. > > > > > > On Thu, Apr 16, 2015 at 1:51 PM, jeremy p < > > athomewithagroove...@gmail.com> > > > wrote: > > > > > > > Thank you for the response. Does this mean the Old-Rules-Job would > > need > > > to > > > > maintain a Last-Processed-Old-Rules offset for each partition? > > > > > > > > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black <b...@b3k.us> wrote: > > > > > > > > > Offsets are per partition. The alternative would have poor scaling > > > > behavior > > > > > for both brokers and consumers. > > > > > > > > > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p < > > > > athomewithagroove...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thanks to everybody for the responses! > > > > > > > > > > > > Yi : The queue must be processed in order, which means that I > > cannot > > > > use > > > > > > Ben and Guozhang's approach. > > > > > > > > > > > > However, it is not necessary that all rules be processed at the > > same > > > > > offset > > > > > > and at the same speed. This is why I considered a solution where > > we > > > > had > > > > > a > > > > > > separate job for each rule. The problem with that solution is > that > > > we > > > > > > could have thousands of these rules, which would mean thousands > of > > > > jobs. > > > > > > These jobs would be really lightweight and would require very few > > > > system > > > > > > resources. However, I don't know if having thousands of jobs > would > > > > break > > > > > > YARN. > > > > > > > > > > > > For now, it sounds like Yan's solution would be the best. > However, > > I > > > > > have a > > > > > > few questions about it. For now, let's call the original job the > > > > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is > the > > > > > > solution, as I understand it : > > > > > > > > > > > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset. > We > > > > start > > > > > > the All-Rules-Job. The All-Rules-Job will only apply new rules > > until > > > > it > > > > > > gets to the Last-Processed-Old-Rules offset. Once the > > All-Rules-Job > > > > gets > > > > > > to the Last-Processed-Old-Rules offset, it sends a kill signal to > > > > > > Old-Rules-Job along a control stream. Old-Rules-Job terminates > > > itself. > > > > > > Then the All-Rules-Job applies both old and new rules to every > > > message > > > > > that > > > > > > comes in. > > > > > > > > > > > > My questions : > > > > > > > > > > > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset > > > every > > > > > > time it processes a message? How does the Old-Rules-Job expose > the > > > > > > Last-Processed-Rules offset to the All-Rules-Job? Would the > > > > > > Last-Processed-Rules offset be the absolute offset within a > topic, > > > and > > > > > not > > > > > > the offset within a partition? Is there a way to find out a > > > message's > > > > > > absolute offset within a topic? > > > > > > > > > > > > Thanks again for all the help! > > > > > > > > > > > > --Jeremy > > > > > > > > > > > > > > > > > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan <nickpa...@gmail.com> > > wrote: > > > > > > > > > > > > > Hi, Jeremy, > > > > > > > > > > > > > > I saw the following requirements from your use case: > > > > > > > > > > > > > > 1) New rules need to be dynamically added w/ creating too many > > > Samza > > > > > jobs > > > > > > > (e.g. 1 Samza job per new rule is too much) > > > > > > > 2) Old rules need to continue processing when new rules are > added > > > > > > > > > > > > > > I want to ask a few more questions regarding to your > > requirements: > > > > > > > > > > > > > > Q.1) Is it required that for a new rule, the bootstrap > processing > > > of > > > > > > > messages from offset 0 to Last-Processed-Old-Rules has to be > done > > > > > before > > > > > > > the new rules can be applied to messages from offset > > > > > > > Last-Processed-Old-Rules? > > > > > > > Q.2) Is it required that after bootstrap, all rules are > > processing > > > > the > > > > > > > message at the same offset w/ the same speed? > > > > > > > > > > > > > > If the answers to both questions (i.e. Q.1 and Q.2) above are > > yes, > > > we > > > > > > will > > > > > > > have to slow down or stop the jobs for the old rules s.t. the > > jobs > > > > > > running > > > > > > > both new and old rules can catch up, as Yan pointed out. If > > answers > > > > to > > > > > > both > > > > > > > questions above are no (which I doubt since you need to > build-up > > > > > certain > > > > > > > "history" for the new rule before you can apply it to later > > > > messages), > > > > > > you > > > > > > > can take Ben/Guozhang's approach w/o coordination between the > two > > > > jobs. > > > > > > > > > > > > > > Now the interesting case is that your answer to Q.1 is yes, and > > to > > > > Q.2 > > > > > is > > > > > > > no, which essentially post a request that your job will need to > > > keep > > > > > > > multiple independent consumer offsets per rule and let them > move > > w/ > > > > > their > > > > > > > own speed. Or, at least one bootstrap consumer, and one normal > > > > > processing > > > > > > > consumer on the same system stream partition within a single > > job. I > > > > > don't > > > > > > > think that Samza support this now. And the only work around is > > > Yan's > > > > > > > solution which requires coordination between two jobs. > > > > > > > > > > > > > > -Yi > > > > > > > > > > > > > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang < > yanfang...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > you are able to call coordinator.shutdown to shut the job > down > > > > after > > > > > it > > > > > > > > reaches the offset. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Fang, Yan > > > > > > > > yanfang...@gmail.com > > > > > > > > > > > > > > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > I feel Ben's solution a bit simpler that you just need to > > > restart > > > > > > your > > > > > > > > > current job with both rules on the check pointed offset, > and > > > > start > > > > > a > > > > > > > new > > > > > > > > > job from offset 0 with only the new rule and it will stop > at > > > the > > > > > > > checkout > > > > > > > > > pointed offset. But of course it requires the second job to > > be > > > > able > > > > > > to > > > > > > > > > shutdown itself upon some specific offset which I am not > sure > > > if > > > > it > > > > > > is > > > > > > > > > already supported. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang < > > > yanfang...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Jeremy, > > > > > > > > > > > > > > > > > > > > In order to reach this goal, we have to assume that the > job > > > > with > > > > > > new > > > > > > > > > rules > > > > > > > > > > can always catch up with the one with old rules. > > Otherwise, I > > > > > think > > > > > > > we > > > > > > > > do > > > > > > > > > > not have the choice but running a lot of jobs > > simultaneously. > > > > > > > > > > > > > > > > > > > > Under our assumption, we have job1 with old rules > running, > > > and > > > > > now > > > > > > > add > > > > > > > > > job2 > > > > > > > > > > which integrates old rules and new rules to run. Job2 > > > > frequently > > > > > > > > > > checks the Last-Processed-Old-Rules > > > > > > > > > > offset from job1 (because job1 is running too), and it > only > > > > > applies > > > > > > > new > > > > > > > > > > rule to the data until catch up with the > > > > Last-Processed-Old-Rules > > > > > > > > offset. > > > > > > > > > > Then it sends signal to the job1 and shutdown job1, and > > > applies > > > > > all > > > > > > > > rules > > > > > > > > > > to the stream. > > > > > > > > > > > > > > > > > > > > In terms of how to shutdown the job1, here is one > solution > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E > > > > > > > > > > > > > > > > > > > > > provided by Chris - e.g. you can have a control stream to > > get > > > > > job1 > > > > > > > > > > shutdown. Samza will provide this kind of stream after > > > > SAMZA-348 > > > > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-348>, which > > is > > > > > under > > > > > > > > active > > > > > > > > > > development. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Fang, Yan > > > > > > > > > > yanfang...@gmail.com > > > > > > > > > > > > > > > > > > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p < > > > > > > > > > athomewithagroove...@gmail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hello Yan, > > > > > > > > > > > > > > > > > > > > > > Thank you for the suggestion! I think your solution > > would > > > > > work, > > > > > > > > > > however, I > > > > > > > > > > > am afraid it would create a performance problem for our > > > > users. > > > > > > > > > > > > > > > > > > > > > > Let's say we kill the Classifier task, and create a new > > > > > > Classifier > > > > > > > > task > > > > > > > > > > > with both the existing rules and new rules. We get the > > > offset > > > > > of > > > > > > > the > > > > > > > > > > > latest-processed message for the old rules. Let's call > > > this > > > > > > offset > > > > > > > > > > > Last-Processed-Old-Rules. We ignore messages > > > > > > > > > > > before Last-Processed-Old-Rules for the old rules. We > > > > > configure > > > > > > > the > > > > > > > > > new > > > > > > > > > > > Classifier task to be a bootstrap task. > > > > > > > > > > > > > > > > > > > > > > Let's say we have users who are watching the output > > topics, > > > > and > > > > > > > they > > > > > > > > > are > > > > > > > > > > > expecting near-realtime updates. They won't see any > > > updates > > > > > for > > > > > > > the > > > > > > > > > old > > > > > > > > > > > rules until our task has passed the > > > Last-Processed-Old-Rules > > > > > > > offset. > > > > > > > > > If > > > > > > > > > > we > > > > > > > > > > > have a lot of messages in that topic, that could take a > > > long > > > > > > time. > > > > > > > > > This > > > > > > > > > > is > > > > > > > > > > > why I was hoping there would be a way to bootstrap the > > new > > > > > rules > > > > > > > > while > > > > > > > > > > > we're still processing the old rules. Do you think > there > > > is > > > > a > > > > > > way > > > > > > > to > > > > > > > > > do > > > > > > > > > > > that? > > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang < > > > > > yanfang...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Jeremy, > > > > > > > > > > > > > > > > > > > > > > > > If my understanding is correct, whenever you add a > new > > > > rule, > > > > > > you > > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > > apply this rule to the historical data. Right? > > > > > > > > > > > > > > > > > > > > > > > > If you do not care about duplication, you can create > a > > > new > > > > > task > > > > > > > > that > > > > > > > > > > > > contains existing rules and new rules. Configure > > > bootstrap. > > > > > > This > > > > > > > > will > > > > > > > > > > > apply > > > > > > > > > > > > all the rules from the beginning of the input stream. > > The > > > > > > > > shortcoming > > > > > > > > > > is > > > > > > > > > > > > you will get duplicated results for old rules. > > > > > > > > > > > > > > > > > > > > > > > > If you can not tolerate the shortcoming, 1) get the > > > offset > > > > of > > > > > > the > > > > > > > > > > > > latest-processed message of old rules. 2) In your new > > > task, > > > > > > > ignore > > > > > > > > > > > messages > > > > > > > > > > > > before that offset for the old rules. 3) bootstrap. > > > > > > > > > > > > > > > > > > > > > > > > Hope this helps. Maybe your use case is more > > complicated? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Fang, Yan > > > > > > > > > > > > yanfang...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p < > > > > > > > > > > > athomewithagroove...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > So, I'm wanting to use Samza for a project I'm > > working > > > > on, > > > > > > but > > > > > > > I > > > > > > > > > keep > > > > > > > > > > > > > running into a problem with bootstrapping. > > > > > > > > > > > > > > > > > > > > > > > > > > Let's say there's a Kafka topic called Numbers > that I > > > > want > > > > > to > > > > > > > > > consume > > > > > > > > > > > > with > > > > > > > > > > > > > Samza. Let's say each message has a single integer > > in > > > > it, > > > > > > and > > > > > > > I > > > > > > > > > want > > > > > > > > > > > to > > > > > > > > > > > > > classify it as even or odd. So I have two topics > > that > > > > I'm > > > > > > > using > > > > > > > > > for > > > > > > > > > > > > > output, one called Even and one called Odd. I > write > > a > > > > > simple > > > > > > > > > stream > > > > > > > > > > > task > > > > > > > > > > > > > called Classifier that consumes the Numbers topic, > > > > examines > > > > > > > each > > > > > > > > > > > incoming > > > > > > > > > > > > > integer and writes it back out to Even or Odd. > > > > > > > > > > > > > > > > > > > > > > > > > > Now, let's say I want to be able to add > > classifications > > > > > > > > > dynamically, > > > > > > > > > > > > like : > > > > > > > > > > > > > "divisible by three", "divisible by four", or > > "numbers > > > > that > > > > > > > > appear > > > > > > > > > in > > > > > > > > > > > my > > > > > > > > > > > > > date of birth". And let's say I have an API I can > > > query > > > > > that > > > > > > > > gives > > > > > > > > > > me > > > > > > > > > > > > all > > > > > > > > > > > > > the assignment rules, such as "when a number is > > > divisble > > > > by > > > > > > 3, > > > > > > > > > write > > > > > > > > > > it > > > > > > > > > > > > out > > > > > > > > > > > > > to a topic called 'divisible_by_three'", or "when a > > > > number > > > > > > > > appears > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > string 12/12/1981, write it to the 'my_birthday' > > > topic". > > > > > So > > > > > > > now > > > > > > > > I > > > > > > > > > > > > rewrite > > > > > > > > > > > > > my stream task to query this API for assignment > > rules. > > > > It > > > > > > > reads > > > > > > > > > > > integers > > > > > > > > > > > > > from the Numbers topic and writes them back out to > > one > > > or > > > > > > more > > > > > > > > > output > > > > > > > > > > > > > topics, according to the assignment rules. > > > > > > > > > > > > > > > > > > > > > > > > > > Now, let's make this even more complicated. When I > > > add a > > > > > new > > > > > > > > > > > > > classification, I want to go back to the very > > beginning > > > > of > > > > > > the > > > > > > > > > > Numbers > > > > > > > > > > > > > topic and classify them accordingly. Once we've > > > consumed > > > > > all > > > > > > > the > > > > > > > > > old > > > > > > > > > > > > > "historical" integers, I want to apply this > > > > classification > > > > > > new > > > > > > > > > > integers > > > > > > > > > > > > as > > > > > > > > > > > > > they come in. > > > > > > > > > > > > > > > > > > > > > > > > > > And this is where I get stuck. > > > > > > > > > > > > > > > > > > > > > > > > > > One thing I can do : when I want to add a new > > > > > > classification, I > > > > > > > > can > > > > > > > > > > > > create > > > > > > > > > > > > > a bootstrap job by setting the > > > > > > > > > > > > > > "systems.kafka.streams.numbers.samza.offset.default" > > > > > property > > > > > > > to > > > > > > > > > > > > "oldest". > > > > > > > > > > > > > And that's great, but the problem is, once I've > > "caught > > > > > up", > > > > > > > I'd > > > > > > > > > like > > > > > > > > > > > to > > > > > > > > > > > > > kill the bootstrap job and just let the Classifier > > > handle > > > > > > this > > > > > > > > new > > > > > > > > > > > > > assignment. So, I'd want to do some kind of > handover > > > > from > > > > > > the > > > > > > > > > > > bootstrap > > > > > > > > > > > > > job to the Classifier job. But how to do this? > > > > > > > > > > > > > > > > > > > > > > > > > > So, the question I must ask is this : Is Samza even > > an > > > > > > > appopriate > > > > > > > > > way > > > > > > > > > > > to > > > > > > > > > > > > > solve this problem? Has this problem ever come up > > for > > > > > > anybody > > > > > > > > > else? > > > > > > > > > > > How > > > > > > > > > > > > > have they solved it? I would really like to use > > Samza > > > > > > because > > > > > > > it > > > > > > > > > > seems > > > > > > > > > > > > > like an appopriate technology, and I'd really > really > > > > really > > > > > > > > really > > > > > > > > > > like > > > > > > > > > > > > to > > > > > > > > > > > > > avoid re-inventing the wheel. > > > > > > > > > > > > > > > > > > > > > > > > > > A couple solutions I came up with : > > > > > > > > > > > > > > > > > > > > > > > > > > 1) The simple solution. Have a separate Samza job > > for > > > > each > > > > > > > > > > > > > classification. If I want to add a new > > > classification, I > > > > > > > create > > > > > > > > a > > > > > > > > > > new > > > > > > > > > > > > job > > > > > > > > > > > > > and set it up as a bootstrap job. This would solve > > the > > > > > > > problem. > > > > > > > > > > > > However, > > > > > > > > > > > > > we may want to have many, many classifications. It > > > could > > > > > be > > > > > > as > > > > > > > > > many > > > > > > > > > > as > > > > > > > > > > > > > 1,000,000, which would mean up to 1,000,000 > > > > simultaneously > > > > > > > > running > > > > > > > > > > > jobs. > > > > > > > > > > > > > This could create a lot of overhead for YARN and > > Kafka. > > > > > > > > > > > > > > > > > > > > > > > > > > 2) My overly-complicated workaround solution. Each > > > > > > assignment > > > > > > > > rule > > > > > > > > > > has > > > > > > > > > > > > an > > > > > > > > > > > > > "isnew" flag. If it's a new classification that > > hasn't > > > > > fully > > > > > > > > > > > > bootstrapped > > > > > > > > > > > > > yet, the "isnew" flag is set to TRUE. When my > > > classifier > > > > > > > queries > > > > > > > > > the > > > > > > > > > > > API > > > > > > > > > > > > > for assignment rules, it ignores any rule with an > > > "isnew" > > > > > > flag. > > > > > > > > > > When I > > > > > > > > > > > > > want to add a new classification, I create a new > > > > bootstrap > > > > > > job > > > > > > > > for > > > > > > > > > > that > > > > > > > > > > > > > classification. Every so often, maybe every few > days > > > or > > > > > so, > > > > > > if > > > > > > > > all > > > > > > > > > > of > > > > > > > > > > > my > > > > > > > > > > > > > bootstrap jobs have "caught up", I kill all of the > > > > > bootstrap > > > > > > > jobs > > > > > > > > > and > > > > > > > > > > > > > classifier jobs. I set all the "isnew" flags to > > FALSE. > > > > > > Then I > > > > > > > > > > restart > > > > > > > > > > > > the > > > > > > > > > > > > > classifier job. This is kind of an ugly solution, > > and > > > > I'm > > > > > > not > > > > > > > > even > > > > > > > > > > > sure > > > > > > > > > > > > it > > > > > > > > > > > > > would work. For one thing, I'd need some way of > > > knowing > > > > > if a > > > > > > > > > > boostrap > > > > > > > > > > > > job > > > > > > > > > > > > > has "caught up". Secondly, I'd essentially be > > > restarting > > > > > the > > > > > > > > > > > classifier > > > > > > > > > > > > > job periodically, which just seems like an ugly > > > solution. > > > > > I > > > > > > > > don't > > > > > > > > > > like > > > > > > > > > > > > it. > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Some other kind of really complicated solution I > > > > haven't > > > > > > > > thought > > > > > > > > > > of > > > > > > > > > > > > yet, > > > > > > > > > > > > > probably involving locks, transactions, > concurrancy, > > > and > > > > > > > > > interprocess > > > > > > > > > > > > > communication. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for reading this whole thing. Please let me > > > know > > > > if > > > > > > you > > > > > > > > > have > > > > > > > > > > > any > > > > > > > > > > > > > suggestions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >