Yan : It sounds like the checkpoint stream might help me! I would like to learn more about how New-Rules-Job can access the checkpoint stream for Old-Rules-Job. Can you please give me an example of how I would do this? Or could you please point me to some documentation or an article where I can learn how to do this?
Thank you! On Thu, Apr 16, 2015 at 5:06 PM, jeremy p <athomewithagroove...@gmail.com> wrote: > Ben : I think we are talking about different things here. I'm not trying > to maintain ordering across a topic. I know that is not what Kafka and > Samza are meant for. What I'm trying to do here is give my Old-Rules-Job a > way of telling New-Rules-Job, "Once you hit this offset, start applying > both old and new rules." So is that a single absolute offset that I want > to pass from Old-Rules-Job to New-Rules-Job? Or a set of offsets, one for > each partition. > > On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black <b...@b3k.us> wrote: > >> If you need to maintain ordering of a sequence of messages, those messages >> should all be written to the same partition. If you are concerned with >> global ordering of all messages in a topic then kafka is likely not going >> to be what you want. Ordering guarantees are strictly per partition. samza >> is built on this principle by having a tasks work from a single partition. >> If your jobs require global coordination between tasks, again, you might >> reconsider either your architecture or your use of kafka. >> >> Not trying to harsh your mellow here. High scale systems like kafka >> require >> you match your architecture to them. To do otherwise produces bad times. >> >> On Thu, Apr 16, 2015 at 1:51 PM, jeremy p <athomewithagroove...@gmail.com >> > >> wrote: >> >> > Thank you for the response. Does this mean the Old-Rules-Job would >> need to >> > maintain a Last-Processed-Old-Rules offset for each partition? >> > >> > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black <b...@b3k.us> wrote: >> > >> > > Offsets are per partition. The alternative would have poor scaling >> > behavior >> > > for both brokers and consumers. >> > > >> > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p < >> > athomewithagroove...@gmail.com> >> > > wrote: >> > > >> > > > Thanks to everybody for the responses! >> > > > >> > > > Yi : The queue must be processed in order, which means that I cannot >> > use >> > > > Ben and Guozhang's approach. >> > > > >> > > > However, it is not necessary that all rules be processed at the same >> > > offset >> > > > and at the same speed. This is why I considered a solution where we >> > had >> > > a >> > > > separate job for each rule. The problem with that solution is that >> we >> > > > could have thousands of these rules, which would mean thousands of >> > jobs. >> > > > These jobs would be really lightweight and would require very few >> > system >> > > > resources. However, I don't know if having thousands of jobs would >> > break >> > > > YARN. >> > > > >> > > > For now, it sounds like Yan's solution would be the best. However, I >> > > have a >> > > > few questions about it. For now, let's call the original job the >> > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the >> > > > solution, as I understand it : >> > > > >> > > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset. We >> > start >> > > > the All-Rules-Job. The All-Rules-Job will only apply new rules >> until >> > it >> > > > gets to the Last-Processed-Old-Rules offset. Once the All-Rules-Job >> > gets >> > > > to the Last-Processed-Old-Rules offset, it sends a kill signal to >> > > > Old-Rules-Job along a control stream. Old-Rules-Job terminates >> itself. >> > > > Then the All-Rules-Job applies both old and new rules to every >> message >> > > that >> > > > comes in. >> > > > >> > > > My questions : >> > > > >> > > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset >> every >> > > > time it processes a message? How does the Old-Rules-Job expose the >> > > > Last-Processed-Rules offset to the All-Rules-Job? Would the >> > > > Last-Processed-Rules offset be the absolute offset within a topic, >> and >> > > not >> > > > the offset within a partition? Is there a way to find out a >> message's >> > > > absolute offset within a topic? >> > > > >> > > > Thanks again for all the help! >> > > > >> > > > --Jeremy >> > > > >> > > > >> > > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan <nickpa...@gmail.com> >> wrote: >> > > > >> > > > > Hi, Jeremy, >> > > > > >> > > > > I saw the following requirements from your use case: >> > > > > >> > > > > 1) New rules need to be dynamically added w/ creating too many >> Samza >> > > jobs >> > > > > (e.g. 1 Samza job per new rule is too much) >> > > > > 2) Old rules need to continue processing when new rules are added >> > > > > >> > > > > I want to ask a few more questions regarding to your requirements: >> > > > > >> > > > > Q.1) Is it required that for a new rule, the bootstrap processing >> of >> > > > > messages from offset 0 to Last-Processed-Old-Rules has to be done >> > > before >> > > > > the new rules can be applied to messages from offset >> > > > > Last-Processed-Old-Rules? >> > > > > Q.2) Is it required that after bootstrap, all rules are processing >> > the >> > > > > message at the same offset w/ the same speed? >> > > > > >> > > > > If the answers to both questions (i.e. Q.1 and Q.2) above are >> yes, we >> > > > will >> > > > > have to slow down or stop the jobs for the old rules s.t. the jobs >> > > > running >> > > > > both new and old rules can catch up, as Yan pointed out. If >> answers >> > to >> > > > both >> > > > > questions above are no (which I doubt since you need to build-up >> > > certain >> > > > > "history" for the new rule before you can apply it to later >> > messages), >> > > > you >> > > > > can take Ben/Guozhang's approach w/o coordination between the two >> > jobs. >> > > > > >> > > > > Now the interesting case is that your answer to Q.1 is yes, and to >> > Q.2 >> > > is >> > > > > no, which essentially post a request that your job will need to >> keep >> > > > > multiple independent consumer offsets per rule and let them move >> w/ >> > > their >> > > > > own speed. Or, at least one bootstrap consumer, and one normal >> > > processing >> > > > > consumer on the same system stream partition within a single job. >> I >> > > don't >> > > > > think that Samza support this now. And the only work around is >> Yan's >> > > > > solution which requires coordination between two jobs. >> > > > > >> > > > > -Yi >> > > > > >> > > > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang <yanfang...@gmail.com> >> > > wrote: >> > > > > >> > > > > > you are able to call coordinator.shutdown to shut the job down >> > after >> > > it >> > > > > > reaches the offset. >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Fang, Yan >> > > > > > yanfang...@gmail.com >> > > > > > >> > > > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang < >> wangg...@gmail.com >> > > >> > > > > wrote: >> > > > > > >> > > > > > > I feel Ben's solution a bit simpler that you just need to >> restart >> > > > your >> > > > > > > current job with both rules on the check pointed offset, and >> > start >> > > a >> > > > > new >> > > > > > > job from offset 0 with only the new rule and it will stop at >> the >> > > > > checkout >> > > > > > > pointed offset. But of course it requires the second job to be >> > able >> > > > to >> > > > > > > shutdown itself upon some specific offset which I am not sure >> if >> > it >> > > > is >> > > > > > > already supported. >> > > > > > > >> > > > > > > Guozhang >> > > > > > > >> > > > > > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang < >> yanfang...@gmail.com> >> > > > > wrote: >> > > > > > > >> > > > > > > > Hi Jeremy, >> > > > > > > > >> > > > > > > > In order to reach this goal, we have to assume that the job >> > with >> > > > new >> > > > > > > rules >> > > > > > > > can always catch up with the one with old rules. Otherwise, >> I >> > > think >> > > > > we >> > > > > > do >> > > > > > > > not have the choice but running a lot of jobs >> simultaneously. >> > > > > > > > >> > > > > > > > Under our assumption, we have job1 with old rules running, >> and >> > > now >> > > > > add >> > > > > > > job2 >> > > > > > > > which integrates old rules and new rules to run. Job2 >> > frequently >> > > > > > > > checks the Last-Processed-Old-Rules >> > > > > > > > offset from job1 (because job1 is running too), and it only >> > > applies >> > > > > new >> > > > > > > > rule to the data until catch up with the >> > Last-Processed-Old-Rules >> > > > > > offset. >> > > > > > > > Then it sends signal to the job1 and shutdown job1, and >> applies >> > > all >> > > > > > rules >> > > > > > > > to the stream. >> > > > > > > > >> > > > > > > > In terms of how to shutdown the job1, here is one solution >> > > > > > > > < >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E >> > > > > > > > > >> > > > > > > > provided by Chris - e.g. you can have a control stream to >> get >> > > job1 >> > > > > > > > shutdown. Samza will provide this kind of stream after >> > SAMZA-348 >> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-348>, which is >> > > under >> > > > > > active >> > > > > > > > development. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > >> > > > > > > > Fang, Yan >> > > > > > > > yanfang...@gmail.com >> > > > > > > > >> > > > > > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p < >> > > > > > > athomewithagroove...@gmail.com >> > > > > > > > > >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hello Yan, >> > > > > > > > > >> > > > > > > > > Thank you for the suggestion! I think your solution would >> > > work, >> > > > > > > > however, I >> > > > > > > > > am afraid it would create a performance problem for our >> > users. >> > > > > > > > > >> > > > > > > > > Let's say we kill the Classifier task, and create a new >> > > > Classifier >> > > > > > task >> > > > > > > > > with both the existing rules and new rules. We get the >> offset >> > > of >> > > > > the >> > > > > > > > > latest-processed message for the old rules. Let's call >> this >> > > > offset >> > > > > > > > > Last-Processed-Old-Rules. We ignore messages >> > > > > > > > > before Last-Processed-Old-Rules for the old rules. We >> > > configure >> > > > > the >> > > > > > > new >> > > > > > > > > Classifier task to be a bootstrap task. >> > > > > > > > > >> > > > > > > > > Let's say we have users who are watching the output >> topics, >> > and >> > > > > they >> > > > > > > are >> > > > > > > > > expecting near-realtime updates. They won't see any >> updates >> > > for >> > > > > the >> > > > > > > old >> > > > > > > > > rules until our task has passed the >> Last-Processed-Old-Rules >> > > > > offset. >> > > > > > > If >> > > > > > > > we >> > > > > > > > > have a lot of messages in that topic, that could take a >> long >> > > > time. >> > > > > > > This >> > > > > > > > is >> > > > > > > > > why I was hoping there would be a way to bootstrap the new >> > > rules >> > > > > > while >> > > > > > > > > we're still processing the old rules. Do you think there >> is >> > a >> > > > way >> > > > > to >> > > > > > > do >> > > > > > > > > that? >> > > > > > > > > >> > > > > > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang < >> > > yanfang...@gmail.com> >> > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi Jeremy, >> > > > > > > > > > >> > > > > > > > > > If my understanding is correct, whenever you add a new >> > rule, >> > > > you >> > > > > > want >> > > > > > > > to >> > > > > > > > > > apply this rule to the historical data. Right? >> > > > > > > > > > >> > > > > > > > > > If you do not care about duplication, you can create a >> new >> > > task >> > > > > > that >> > > > > > > > > > contains existing rules and new rules. Configure >> bootstrap. >> > > > This >> > > > > > will >> > > > > > > > > apply >> > > > > > > > > > all the rules from the beginning of the input stream. >> The >> > > > > > shortcoming >> > > > > > > > is >> > > > > > > > > > you will get duplicated results for old rules. >> > > > > > > > > > >> > > > > > > > > > If you can not tolerate the shortcoming, 1) get the >> offset >> > of >> > > > the >> > > > > > > > > > latest-processed message of old rules. 2) In your new >> task, >> > > > > ignore >> > > > > > > > > messages >> > > > > > > > > > before that offset for the old rules. 3) bootstrap. >> > > > > > > > > > >> > > > > > > > > > Hope this helps. Maybe your use case is more >> complicated? >> > > > > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > > > > >> > > > > > > > > > Fang, Yan >> > > > > > > > > > yanfang...@gmail.com >> > > > > > > > > > >> > > > > > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p < >> > > > > > > > > athomewithagroove...@gmail.com >> > > > > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > So, I'm wanting to use Samza for a project I'm working >> > on, >> > > > but >> > > > > I >> > > > > > > keep >> > > > > > > > > > > running into a problem with bootstrapping. >> > > > > > > > > > > >> > > > > > > > > > > Let's say there's a Kafka topic called Numbers that I >> > want >> > > to >> > > > > > > consume >> > > > > > > > > > with >> > > > > > > > > > > Samza. Let's say each message has a single integer in >> > it, >> > > > and >> > > > > I >> > > > > > > want >> > > > > > > > > to >> > > > > > > > > > > classify it as even or odd. So I have two topics that >> > I'm >> > > > > using >> > > > > > > for >> > > > > > > > > > > output, one called Even and one called Odd. I write a >> > > simple >> > > > > > > stream >> > > > > > > > > task >> > > > > > > > > > > called Classifier that consumes the Numbers topic, >> > examines >> > > > > each >> > > > > > > > > incoming >> > > > > > > > > > > integer and writes it back out to Even or Odd. >> > > > > > > > > > > >> > > > > > > > > > > Now, let's say I want to be able to add >> classifications >> > > > > > > dynamically, >> > > > > > > > > > like : >> > > > > > > > > > > "divisible by three", "divisible by four", or "numbers >> > that >> > > > > > appear >> > > > > > > in >> > > > > > > > > my >> > > > > > > > > > > date of birth". And let's say I have an API I can >> query >> > > that >> > > > > > gives >> > > > > > > > me >> > > > > > > > > > all >> > > > > > > > > > > the assignment rules, such as "when a number is >> divisble >> > by >> > > > 3, >> > > > > > > write >> > > > > > > > it >> > > > > > > > > > out >> > > > > > > > > > > to a topic called 'divisible_by_three'", or "when a >> > number >> > > > > > appears >> > > > > > > in >> > > > > > > > > the >> > > > > > > > > > > string 12/12/1981, write it to the 'my_birthday' >> topic". >> > > So >> > > > > now >> > > > > > I >> > > > > > > > > > rewrite >> > > > > > > > > > > my stream task to query this API for assignment rules. >> > It >> > > > > reads >> > > > > > > > > integers >> > > > > > > > > > > from the Numbers topic and writes them back out to >> one or >> > > > more >> > > > > > > output >> > > > > > > > > > > topics, according to the assignment rules. >> > > > > > > > > > > >> > > > > > > > > > > Now, let's make this even more complicated. When I >> add a >> > > new >> > > > > > > > > > > classification, I want to go back to the very >> beginning >> > of >> > > > the >> > > > > > > > Numbers >> > > > > > > > > > > topic and classify them accordingly. Once we've >> consumed >> > > all >> > > > > the >> > > > > > > old >> > > > > > > > > > > "historical" integers, I want to apply this >> > classification >> > > > new >> > > > > > > > integers >> > > > > > > > > > as >> > > > > > > > > > > they come in. >> > > > > > > > > > > >> > > > > > > > > > > And this is where I get stuck. >> > > > > > > > > > > >> > > > > > > > > > > One thing I can do : when I want to add a new >> > > > classification, I >> > > > > > can >> > > > > > > > > > create >> > > > > > > > > > > a bootstrap job by setting the >> > > > > > > > > > > "systems.kafka.streams.numbers.samza.offset.default" >> > > property >> > > > > to >> > > > > > > > > > "oldest". >> > > > > > > > > > > And that's great, but the problem is, once I've >> "caught >> > > up", >> > > > > I'd >> > > > > > > like >> > > > > > > > > to >> > > > > > > > > > > kill the bootstrap job and just let the Classifier >> handle >> > > > this >> > > > > > new >> > > > > > > > > > > assignment. So, I'd want to do some kind of handover >> > from >> > > > the >> > > > > > > > > bootstrap >> > > > > > > > > > > job to the Classifier job. But how to do this? >> > > > > > > > > > > >> > > > > > > > > > > So, the question I must ask is this : Is Samza even an >> > > > > appopriate >> > > > > > > way >> > > > > > > > > to >> > > > > > > > > > > solve this problem? Has this problem ever come up for >> > > > anybody >> > > > > > > else? >> > > > > > > > > How >> > > > > > > > > > > have they solved it? I would really like to use Samza >> > > > because >> > > > > it >> > > > > > > > seems >> > > > > > > > > > > like an appopriate technology, and I'd really really >> > really >> > > > > > really >> > > > > > > > like >> > > > > > > > > > to >> > > > > > > > > > > avoid re-inventing the wheel. >> > > > > > > > > > > >> > > > > > > > > > > A couple solutions I came up with : >> > > > > > > > > > > >> > > > > > > > > > > 1) The simple solution. Have a separate Samza job for >> > each >> > > > > > > > > > > classification. If I want to add a new >> classification, I >> > > > > create >> > > > > > a >> > > > > > > > new >> > > > > > > > > > job >> > > > > > > > > > > and set it up as a bootstrap job. This would solve >> the >> > > > > problem. >> > > > > > > > > > However, >> > > > > > > > > > > we may want to have many, many classifications. It >> could >> > > be >> > > > as >> > > > > > > many >> > > > > > > > as >> > > > > > > > > > > 1,000,000, which would mean up to 1,000,000 >> > simultaneously >> > > > > > running >> > > > > > > > > jobs. >> > > > > > > > > > > This could create a lot of overhead for YARN and >> Kafka. >> > > > > > > > > > > >> > > > > > > > > > > 2) My overly-complicated workaround solution. Each >> > > > assignment >> > > > > > rule >> > > > > > > > has >> > > > > > > > > > an >> > > > > > > > > > > "isnew" flag. If it's a new classification that >> hasn't >> > > fully >> > > > > > > > > > bootstrapped >> > > > > > > > > > > yet, the "isnew" flag is set to TRUE. When my >> classifier >> > > > > queries >> > > > > > > the >> > > > > > > > > API >> > > > > > > > > > > for assignment rules, it ignores any rule with an >> "isnew" >> > > > flag. >> > > > > > > > When I >> > > > > > > > > > > want to add a new classification, I create a new >> > bootstrap >> > > > job >> > > > > > for >> > > > > > > > that >> > > > > > > > > > > classification. Every so often, maybe every few days >> or >> > > so, >> > > > if >> > > > > > all >> > > > > > > > of >> > > > > > > > > my >> > > > > > > > > > > bootstrap jobs have "caught up", I kill all of the >> > > bootstrap >> > > > > jobs >> > > > > > > and >> > > > > > > > > > > classifier jobs. I set all the "isnew" flags to >> FALSE. >> > > > Then I >> > > > > > > > restart >> > > > > > > > > > the >> > > > > > > > > > > classifier job. This is kind of an ugly solution, and >> > I'm >> > > > not >> > > > > > even >> > > > > > > > > sure >> > > > > > > > > > it >> > > > > > > > > > > would work. For one thing, I'd need some way of >> knowing >> > > if a >> > > > > > > > boostrap >> > > > > > > > > > job >> > > > > > > > > > > has "caught up". Secondly, I'd essentially be >> restarting >> > > the >> > > > > > > > > classifier >> > > > > > > > > > > job periodically, which just seems like an ugly >> solution. >> > > I >> > > > > > don't >> > > > > > > > like >> > > > > > > > > > it. >> > > > > > > > > > > >> > > > > > > > > > > 3) Some other kind of really complicated solution I >> > haven't >> > > > > > thought >> > > > > > > > of >> > > > > > > > > > yet, >> > > > > > > > > > > probably involving locks, transactions, concurrancy, >> and >> > > > > > > interprocess >> > > > > > > > > > > communication. >> > > > > > > > > > > >> > > > > > > > > > > Thanks for reading this whole thing. Please let me >> know >> > if >> > > > you >> > > > > > > have >> > > > > > > > > any >> > > > > > > > > > > suggestions. >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > -- Guozhang >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >