Hi, @Martin, thanks for you comments. Maybe I'm missing some important point, but I think coupling the releases is actually a *good* thing. To make an example, would it be better if the MR and HDFS components of Hadoop had different release schedules?
Actually, keeping the discussion in a single place would make agreeing on releases (and backwards compatibility) much easier, as everybody would be responsible for the whole codebase. That said, I like the idea of absorbing samza-core as a sub-project, and leave the fancy stuff separate. It probably gives 90% of the benefits we have been discussing here. Cheers, -- Gianmarco On 7 July 2015 at 02:30, Jay Kreps <jay.kr...@gmail.com> wrote: > Hey Martin, > > I agree coupling release schedules is a downside. > > Definitely we can try to solve some of the integration problems in > Confluent Platform or in other distributions. But I think this ends up > being really shallow. I guess I feel to really get a good user experience > the two systems have to kind of feel like part of the same thing and you > can't really add that in later--you can put both in the same downloadable > tar file but it doesn't really give a very cohesive feeling. I agree that > ultimately any of the project stuff is as much social and naming as > anything else--theoretically two totally independent projects could work to > tightly align. In practice this seems to be quite difficult though. > > For the frameworks--totally agree it would be good to maintain the > framework support with the project. In some cases there may not be too much > there since the integration gets lighter but I think whatever stubs you > need should be included. So no I definitely wasn't trying to imply dropping > support for these frameworks, just making the integration lighter by > separating process management from partition management. > > You raise two good points we would have to figure out if we went down the > alignment path: > 1. With respect to the name, yeah I think the first question is whether > some "re-branding" would be worth it. If so then I think we can have a big > thread on the name. I'm definitely not set on Kafka Streaming or Kafka > Streams I was just using them to be kind of illustrative. I agree with your > critique of these names, though I think people would get the idea. > 2. Yeah you also raise a good point about how to "factor" it. Here are the > options I see (I could get enthusiastic about any of them): > a. One repo for both Kafka and Samza > b. Two repos, retaining the current seperation > c. Two repos, the equivalent of samza-api and samza-core is absorbed > almost like a third client > > Cheers, > > -Jay > > On Mon, Jul 6, 2015 at 1:18 PM, Martin Kleppmann <mar...@kleppmann.com> > wrote: > > > Ok, thanks for the clarifications. Just a few follow-up comments. > > > > - I see the appeal of merging with Kafka or becoming a subproject: the > > reasons you mention are good. The risk I see is that release schedules > > become coupled to each other, which can slow everyone down, and large > > projects with many contributors are harder to manage. (Jakob, can you > speak > > from experience, having seen a wider range of Hadoop ecosystem projects?) > > > > Some of the goals of a better unified developer experience could also be > > solved by integrating Samza nicely into a Kafka distribution (such as > > Confluent's). I'm not against merging projects if we decide that's the > way > > to go, just pointing out the same goals can perhaps also be achieved in > > other ways. > > > > - With regard to dropping the YARN dependency: are you proposing that > > Samza doesn't give any help to people wanting to run on > YARN/Mesos/AWS/etc? > > So the docs would basically have a link to Slider and nothing else? Or > > would we maintain integrations with a bunch of popular deployment methods > > (e.g. the necessary glue and shell scripts to make Samza work with > Slider)? > > > > I absolutely think it's a good idea to have the "as a library" and "as a > > process" (using Yi's taxonomy) options for people who want them, but I > > think there should also be a low-friction path for common "as a service" > > deployment methods, for which we probably need to maintain integrations. > > > > - Project naming: "Kafka Streams" seems odd to me, because Kafka is all > > about streams already. Perhaps "Kafka Transformers" or "Kafka Filters" > > would be more apt? > > > > One suggestion: perhaps the core of Samza (stream transformation with > > state management -- i.e. the "Samza as a library" bit) could become part > of > > Kafka, while higher-level tools such as streaming SQL and integrations > with > > deployment frameworks remain in a separate project? In other words, Kafka > > would absorb the proven, stable core of Samza, which would become the > > "third Kafka client" mentioned early in this thread. The Samza project > > would then target that third Kafka client as its base API, and the > project > > would be freed up to explore more experimental new horizons. > > > > Martin > > > > On 6 Jul 2015, at 18:51, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Hey Martin, > > > > > > For the YARN/Mesos/etc decoupling I actually don't think it ties our > > hands > > > at all, all it does is refactor things. The division of responsibility > is > > > that Samza core is responsible for task lifecycle, state, and partition > > > management (using the Kafka co-ordinator) but it is NOT responsible for > > > packaging, configuration deployment or execution of processes. The > > problem > > > of packaging and starting these processes is > > > framework/environment-specific. This leaves individual frameworks to be > > as > > > fancy or vanilla as they like. So you can get simple stateless support > in > > > YARN, Mesos, etc using their off-the-shelf app framework (Slider, > > Marathon, > > > etc). These are well known by people and have nice UIs and a lot of > > > flexibility. I don't think they have node affinity as a built in option > > > (though I could be wrong). So if we want that we can either wait for > them > > > to add it or do a custom framework to add that feature (as now). > > Obviously > > > if you manage things with old-school ops tools (puppet/chef/etc) you > get > > > locality easily. The nice thing, though, is that all the samza > "business > > > logic" around partition management and fault tolerance is in Samza core > > so > > > it is shared across frameworks and the framework specific bit is just > > > whether it is smart enough to try to get the same host when a job is > > > restarted. > > > > > > With respect to the Kafka-alignment, yeah I think the goal would be (a) > > > actually get better alignment in user experience, and (b) express this > in > > > the naming and project branding. Specifically: > > > 1. Website/docs, it would be nice for the "transformation" api to be > > > discoverable in the main Kafka docs--i.e. be able to explain when to > use > > > the consumer and when to use the stream processing functionality and > lead > > > people into that experience. > > > 2. Align releases so if you get Kafkza 1.4.2 (or whatever) that has > both > > > Kafka and the stream processing part and they actually work together. > > > 3. Unify the programming experience so the client and Samza api share > > > config/monitoring/naming/packaging/etc. > > > > > > I think sub-projects keep separate committers and can have a separate > > repo, > > > but I'm actually not really sure (I can't find a definition of a > > subproject > > > in Apache). > > > > > > Basically at a high-level you want the experience to "feel" like a > single > > > system, not to relatively independent things that are kind of awkwardly > > > glued together. > > > > > > I think if we did that they having naming or branding like "kafka > > > streaming" or "kafka streams" or something like that would actually do > a > > > good job of conveying what it is. I do that this would help adoption > > quite > > > a lot as it would correctly convey that using Kafka Streaming with > Kafka > > is > > > a fairly seamless experience and Kafka is pretty heavily adopted at > this > > > point. > > > > > > Fwiw we actually considered this model originally when open sourcing > > Samza, > > > however at that time Kafka was relatively unknown and we decided not to > > do > > > it since we felt it would be limiting. From my point of view the three > > > things have changed (1) Kafka is now really heavily used for stream > > > processing, (2) we learned that abstracting out the stream well is > > > basically impossible, (3) we learned it is really hard to keep the two > > > things feeling like a single product. > > > > > > -Jay > > > > > > > > > On Mon, Jul 6, 2015 at 3:37 AM, Martin Kleppmann <mar...@kleppmann.com > > > > > wrote: > > > > > >> Hi all, > > >> > > >> Lots of good thoughts here. > > >> > > >> I agree with the general philosophy of tying Samza more firmly to > Kafka. > > >> After I spent a while looking at integrating other message brokers > (e.g. > > >> Kinesis) with SystemConsumer, I came to the conclusion that > > SystemConsumer > > >> tacitly assumes a model so much like Kafka's that pretty much nobody > but > > >> Kafka actually implements it. (Databus is perhaps an exception, but it > > >> isn't widely used outside of LinkedIn.) Thus, making Samza fully > > dependent > > >> on Kafka acknowledges that the system-independence was never as real > as > > we > > >> perhaps made it out to be. The gains of code reuse are real. > > >> > > >> The idea of decoupling Samza from YARN has also always been appealing > to > > >> me, for various reasons already mentioned in this thread. Although > > making > > >> Samza jobs deployable on anything (YARN/Mesos/AWS/etc) seems laudable, > > I am > > >> a little concerned that it will restrict us to a lowest common > > denominator. > > >> For example, would host affinity (SAMZA-617) still be possible? For > jobs > > >> with large amounts of state, I think SAMZA-617 would be a big boon, > > since > > >> restoring state off the changelog on every single restart is painful, > > due > > >> to long recovery times. It would be a shame if the decoupling from > YARN > > >> made host affinity impossible. > > >> > > >> Jay, a question about the proposed API for instantiating a job in code > > >> (rather than a properties file): when submitting a job to a cluster, > is > > the > > >> idea that the instantiation code runs on a client somewhere, which > then > > >> pokes the necessary endpoints on YARN/Mesos/AWS/etc? Or does that code > > run > > >> on each container that is part of the job (in which case, how does the > > job > > >> submission to the cluster work)? > > >> > > >> I agree with Garry that it doesn't feel right to make a 1.0 release > > with a > > >> plan for it to be immediately obsolete. So if this is going to > happen, I > > >> think it would be more honest to stick with 0.* version numbers until > > the > > >> library-ified Samza has been implemented, is stable and widely used. > > >> > > >> Should the new Samza be a subproject of Kafka? There is precedent for > > >> tight coupling between different Apache projects (e.g. Curator and > > >> Zookeeper, or Slider and YARN), so I think remaining separate would be > > ok. > > >> Even if Samza is fully dependent on Kafka, there is enough substance > in > > >> Samza that it warrants being a separate project. An argument in favour > > of > > >> merging would be if we think Kafka has a much stronger "brand > presence" > > >> than Samza; I'm ambivalent on that one. If the Kafka project is > willing > > to > > >> endorse Samza as the "official" way of doing stateful stream > > >> transformations, that would probably have much the same effect as > > >> re-branding Samza as "Kafka Stream Processors" or suchlike. Close > > >> collaboration between the two projects will be needed in any case. > > >> > > >> From a project management perspective, I guess the "new Samza" would > > have > > >> to be developed on a branch alongside ongoing maintenance of the > current > > >> line of development? I think it would be important to continue > > supporting > > >> existing users, and provide a graceful migration path to the new > > version. > > >> Leaving the current versions unsupported and forcing people to rewrite > > >> their jobs would send a bad signal. > > >> > > >> Best, > > >> Martin > > >> > > >> On 2 Jul 2015, at 16:59, Jay Kreps <j...@confluent.io> wrote: > > >> > > >>> Hey Garry, > > >>> > > >>> Yeah that's super frustrating. I'd be happy to chat more about this > if > > >>> you'd be interested. I think Chris and I started with the idea of > "what > > >>> would it take to make Samza a kick-ass ingestion tool" but ultimately > > we > > >>> kind of came around to the idea that ingestion and transformation had > > >>> pretty different needs and coupling the two made things hard. > > >>> > > >>> For what it's worth I think copycat (KIP-26) actually will do what > you > > >> are > > >>> looking for. > > >>> > > >>> With regard to your point about slider, I don't necessarily disagree. > > >> But I > > >>> think getting good YARN support is quite doable and I think we can > make > > >>> that work well. I think the issue this proposal solves is that > > >> technically > > >>> it is pretty hard to support multiple cluster management systems the > > way > > >>> things are now, you need to write an "app master" or "framework" for > > each > > >>> and they are all a little different so testing is really hard. In the > > >>> absence of this we have been stuck with just YARN which has fantastic > > >>> penetration in the Hadoopy part of the org, but zero penetration > > >> elsewhere. > > >>> Given the huge amount of work being put in to slider, marathon, aws > > >>> tooling, not to mention the umpteen related packaging technologies > > people > > >>> want to use (Docker, Kubernetes, various cloud-specific deploy tools, > > >> etc) > > >>> I really think it is important to get this right. > > >>> > > >>> -Jay > > >>> > > >>> On Thu, Jul 2, 2015 at 4:17 AM, Garry Turkington < > > >>> g.turking...@improvedigital.com> wrote: > > >>> > > >>>> Hi all, > > >>>> > > >>>> I think the question below re does Samza become a sub-project of > Kafka > > >>>> highlights the broader point around migration. Chris mentions > Samza's > > >>>> maturity is heading towards a v1 release but I'm not sure it feels > > >> right to > > >>>> launch a v1 then immediately plan to deprecate most of it. > > >>>> > > >>>> From a selfish perspective I have some guys who have started working > > >> with > > >>>> Samza and building some new consumers/producers was next up. Sounds > > like > > >>>> that is absolutely not the direction to go. I need to look into the > > KIP > > >> in > > >>>> more detail but for me the attractiveness of adding new Samza > > >>>> consumer/producers -- even if yes all they were doing was really > > getting > > >>>> data into and out of Kafka -- was to avoid having to worry about > the > > >>>> lifecycle management of external clients. If there is a generic > Kafka > > >>>> ingress/egress layer that I can plug a new connector into and have a > > >> lot of > > >>>> the heavy lifting re scale and reliability done for me then it gives > > me > > >> all > > >>>> the pushing new consumers/producers would. If not then it > complicates > > my > > >>>> operational deployments. > > >>>> > > >>>> Which is similar to my other question with the proposal -- if we > > build a > > >>>> fully available/stand-alone Samza plus the requisite shims to > > integrate > > >>>> with Slider etc I suspect the former may be a lot more work than we > > >> think. > > >>>> We may make it much easier for a newcomer to get something running > but > > >>>> having them step up and get a reliable production deployment may > still > > >>>> dominate mailing list traffic, if for different reasons than today. > > >>>> > > >>>> Don't get me wrong -- I'm comfortable with making the Samza > dependency > > >> on > > >>>> Kafka much more explicit and I absolutely see the benefits in the > > >>>> reduction of duplication and clashing terminologies/abstractions > that > > >>>> Chris/Jay describe. Samza as a library would likely be a very nice > > tool > > >> to > > >>>> add to the Kafka ecosystem. I just have the concerns above re the > > >>>> operational side. > > >>>> > > >>>> Garry > > >>>> > > >>>> -----Original Message----- > > >>>> From: Gianmarco De Francisci Morales [mailto:g...@apache.org] > > >>>> Sent: 02 July 2015 12:56 > > >>>> To: dev@samza.apache.org > > >>>> Subject: Re: Thoughts and obesrvations on Samza > > >>>> > > >>>> Very interesting thoughts. > > >>>> From outside, I have always perceived Samza as a computing layer > over > > >>>> Kafka. > > >>>> > > >>>> The question, maybe a bit provocative, is "should Samza be a > > sub-project > > >>>> of Kafka then?" > > >>>> Or does it make sense to keep it as a separate project with a > separate > > >>>> governance? > > >>>> > > >>>> Cheers, > > >>>> > > >>>> -- > > >>>> Gianmarco > > >>>> > > >>>> On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> wrote: > > >>>> > > >>>>> Overall, I agree to couple with Kafka more tightly. Because Samza > de > > >>>>> facto is based on Kafka, and it should leverage what Kafka has. At > > the > > >>>>> same time, Kafka does not need to reinvent what Samza already has. > I > > >>>>> also like the idea of separating the ingestion and transformation. > > >>>>> > > >>>>> But it is a little difficult for me to image how the Samza will > look > > >>>> like. > > >>>>> And I feel Chris and Jay have a little difference in terms of how > > >>>>> Samza should look like. > > >>>>> > > >>>>> *** Will it look like what Jay's code shows (A client of Kakfa) ? > And > > >>>>> user's application code calls this client? > > >>>>> > > >>>>> 1. If we make Samza be a library of Kafka (like what the code > shows), > > >>>>> how do we implement auto-balance and fault-tolerance? Are they > taken > > >>>>> care by the Kafka broker or other mechanism, such as "Samza worker" > > >>>>> (just make up the name) ? > > >>>>> > > >>>>> 2. What about other features, such as auto-scaling, shared state, > > >>>>> monitoring? > > >>>>> > > >>>>> > > >>>>> *** If we have Samza standalone, (is this what Chris suggests?) > > >>>>> > > >>>>> 1. we still need to ingest data from Kakfa and produce to it. Then > it > > >>>>> becomes the same as what Samza looks like now, except it does not > > rely > > >>>>> on Yarn anymore. > > >>>>> > > >>>>> 2. if it is standalone, how can it leverage Kafka's metrics, logs, > > >>>>> etc? Use Kafka code as the dependency? > > >>>>> > > >>>>> > > >>>>> Thanks, > > >>>>> > > >>>>> Fang, Yan > > >>>>> yanfang...@gmail.com > > >>>>> > > >>>>> On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com> > > >>>> wrote: > > >>>>> > > >>>>>> Read through the code example and it looks good to me. A few > > >>>>>> thoughts regarding deployment: > > >>>>>> > > >>>>>> Today Samza deploys as executable runnable like: > > >>>>>> > > >>>>>> deploy/samza/bin/run-job.sh --config-factory=... > > >>>> --config-path=file://... > > >>>>>> > > >>>>>> And this proposal advocate for deploying Samza more as embedded > > >>>>>> libraries in user application code (ignoring the terminology since > > >>>>>> it is not the > > >>>>> same > > >>>>>> as the prototype code): > > >>>>>> > > >>>>>> StreamTask task = new MyStreamTask(configs); Thread thread = new > > >>>>>> Thread(task); thread.start(); > > >>>>>> > > >>>>>> I think both of these deployment modes are important for different > > >>>>>> types > > >>>>> of > > >>>>>> users. That said, I think making Samza purely standalone is still > > >>>>>> sufficient for either runnable or library modes. > > >>>>>> > > >>>>>> Guozhang > > >>>>>> > > >>>>>> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io> > > wrote: > > >>>>>> > > >>>>>>> Looks like gmail mangled the code example, it was supposed to > look > > >>>>>>> like > > >>>>>>> this: > > >>>>>>> > > >>>>>>> Properties props = new Properties(); > > >>>>>>> props.put("bootstrap.servers", "localhost:4242"); StreamingConfig > > >>>>>>> config = new StreamingConfig(props); > > >>>>>>> config.subscribe("test-topic-1", "test-topic-2"); > > >>>>>>> config.processor(ExampleStreamProcessor.class); > > >>>>>>> config.serialization(new StringSerializer(), new > > >>>>>>> StringDeserializer()); KafkaStreaming container = new > > >>>>>>> KafkaStreaming(config); container.run(); > > >>>>>>> > > >>>>>>> -Jay > > >>>>>>> > > >>>>>>> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> > > >>>> wrote: > > >>>>>>> > > >>>>>>>> Hey guys, > > >>>>>>>> > > >>>>>>>> This came out of some conversations Chris and I were having > > >>>>>>>> around > > >>>>>>> whether > > >>>>>>>> it would make sense to use Samza as a kind of data ingestion > > >>>>> framework > > >>>>>>> for > > >>>>>>>> Kafka (which ultimately lead to KIP-26 "copycat"). This kind of > > >>>>>> combined > > >>>>>>>> with complaints around config and YARN and the discussion around > > >>>>>>>> how > > >>>>> to > > >>>>>>>> best do a standalone mode. > > >>>>>>>> > > >>>>>>>> So the thought experiment was, given that Samza was basically > > >>>>>>>> already totally Kafka specific, what if you just embraced that > > >>>>>>>> and turned it > > >>>>>> into > > >>>>>>>> something less like a heavyweight framework and more like a > > >>>>>>>> third > > >>>>> Kafka > > >>>>>>>> client--a kind of "producing consumer" with state management > > >>>>>> facilities. > > >>>>>>>> Basically a library. Instead of a complex stream processing > > >>>>>>>> framework > > >>>>>>> this > > >>>>>>>> would actually be a very simple thing, not much more complicated > > >>>>>>>> to > > >>>>> use > > >>>>>>> or > > >>>>>>>> operate than a Kafka consumer. As Chris said we thought about it > > >>>>>>>> a > > >>>>> lot > > >>>>>> of > > >>>>>>>> what Samza (and the other stream processing systems were doing) > > >>>>> seemed > > >>>>>>> like > > >>>>>>>> kind of a hangover from MapReduce. > > >>>>>>>> > > >>>>>>>> Of course you need to ingest/output data to and from the stream > > >>>>>>>> processing. But when we actually looked into how that would > > >>>>>>>> work, > > >>>>> Samza > > >>>>>>>> isn't really an ideal data ingestion framework for a bunch of > > >>>>> reasons. > > >>>>>> To > > >>>>>>>> really do that right you need a pretty different internal data > > >>>>>>>> model > > >>>>>> and > > >>>>>>>> set of apis. So what if you split them and had an api for Kafka > > >>>>>>>> ingress/egress (copycat AKA KIP-26) and a separate api for Kafka > > >>>>>>>> transformation (Samza). > > >>>>>>>> > > >>>>>>>> This would also allow really embracing the same terminology and > > >>>>>>>> conventions. One complaint about the current state is that the > > >>>>>>>> two > > >>>>>>> systems > > >>>>>>>> kind of feel bolted on. Terminology like "stream" vs "topic" and > > >>>>>>> different > > >>>>>>>> config and monitoring systems means you kind of have to learn > > >>>>>>>> Kafka's > > >>>>>>> way, > > >>>>>>>> then learn Samza's slightly different way, then kind of > > >>>>>>>> understand > > >>>>> how > > >>>>>>> they > > >>>>>>>> map to each other, which having walked a few people through this > > >>>>>>>> is surprisingly tricky for folks to get. > > >>>>>>>> > > >>>>>>>> Since I have been spending a lot of time on airplanes I hacked > > >>>>>>>> up an ernest but still somewhat incomplete prototype of what > > >>>>>>>> this would > > >>>>> look > > >>>>>>>> like. This is just unceremoniously dumped into Kafka as it > > >>>>>>>> required a > > >>>>>> few > > >>>>>>>> changes to the new consumer. Here is the code: > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org > > >>>>> /apache/kafka/clients/streaming > > >>>>>>>> > > >>>>>>>> For the purpose of the prototype I just liberally renamed > > >>>>>>>> everything > > >>>>> to > > >>>>>>>> try to align it with Kafka with no regard for compatibility. > > >>>>>>>> > > >>>>>>>> To use this would be something like this: > > >>>>>>>> Properties props = new Properties(); > > >>>>>>>> props.put("bootstrap.servers", "localhost:4242"); > > >>>>>>>> StreamingConfig config = new > > >>>>> StreamingConfig(props); > > >>>>>>> config.subscribe("test-topic-1", > > >>>>>>>> "test-topic-2"); config.processor(ExampleStreamProcessor.class); > > >>>>>>> config.serialization(new > > >>>>>>>> StringSerializer(), new StringDeserializer()); KafkaStreaming > > >>>>>> container = > > >>>>>>>> new KafkaStreaming(config); container.run(); > > >>>>>>>> > > >>>>>>>> KafkaStreaming is basically the SamzaContainer; StreamProcessor > > >>>>>>>> is basically StreamTask. > > >>>>>>>> > > >>>>>>>> So rather than putting all the class names in a file and then > > >>>>>>>> having > > >>>>>> the > > >>>>>>>> job assembled by reflection, you just instantiate the container > > >>>>>>>> programmatically. Work is balanced over however many instances > > >>>>>>>> of > > >>>>> this > > >>>>>>> are > > >>>>>>>> alive at any time (i.e. if an instance dies, new tasks are added > > >>>>>>>> to > > >>>>> the > > >>>>>>>> existing containers without shutting them down). > > >>>>>>>> > > >>>>>>>> We would provide some glue for running this stuff in YARN via > > >>>>>>>> Slider, Mesos via Marathon, and AWS using some of their tools > > >>>>>>>> but from the > > >>>>>> point > > >>>>>>> of > > >>>>>>>> view of these frameworks these stream processing jobs are just > > >>>>>> stateless > > >>>>>>>> services that can come and go and expand and contract at will. > > >>>>>>>> There > > >>>>> is > > >>>>>>> no > > >>>>>>>> more custom scheduler. > > >>>>>>>> > > >>>>>>>> Here are some relevant details: > > >>>>>>>> > > >>>>>>>> 1. It is only ~1300 lines of code, it would get larger if we > > >>>>>>>> productionized but not vastly larger. We really do get a ton > > >>>>>>>> of > > >>>>>>> leverage > > >>>>>>>> out of Kafka. > > >>>>>>>> 2. Partition management is fully delegated to the new consumer. > > >>>>> This > > >>>>>>>> is nice since now any partition management strategy available > > >>>>>>>> to > > >>>>>> Kafka > > >>>>>>>> consumer is also available to Samza (and vice versa) and with > > >>>>>>>> the > > >>>>>>> exact > > >>>>>>>> same configs. > > >>>>>>>> 3. It supports state as well as state reuse > > >>>>>>>> > > >>>>>>>> Anyhow take a look, hopefully it is thought provoking. > > >>>>>>>> > > >>>>>>>> -Jay > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini < > > >>>>>> criccom...@apache.org> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hey all, > > >>>>>>>>> > > >>>>>>>>> I have had some discussions with Samza engineers at LinkedIn > > >>>>>>>>> and > > >>>>>>> Confluent > > >>>>>>>>> and we came up with a few observations and would like to > > >>>>>>>>> propose > > >>>>> some > > >>>>>>>>> changes. > > >>>>>>>>> > > >>>>>>>>> We've observed some things that I want to call out about > > >>>>>>>>> Samza's > > >>>>>> design, > > >>>>>>>>> and I'd like to propose some changes. > > >>>>>>>>> > > >>>>>>>>> * Samza is dependent upon a dynamic deployment system. > > >>>>>>>>> * Samza is too pluggable. > > >>>>>>>>> * Samza's SystemConsumer/SystemProducer and Kafka's consumer > > >>>>>>>>> APIs > > >>>>> are > > >>>>>>>>> trying to solve a lot of the same problems. > > >>>>>>>>> > > >>>>>>>>> All three of these issues are related, but I'll address them in > > >>>>> order. > > >>>>>>>>> > > >>>>>>>>> Deployment > > >>>>>>>>> > > >>>>>>>>> Samza strongly depends on the use of a dynamic deployment > > >>>>>>>>> scheduler > > >>>>>> such > > >>>>>>>>> as > > >>>>>>>>> YARN, Mesos, etc. When we initially built Samza, we bet that > > >>>>>>>>> there > > >>>>>> would > > >>>>>>>>> be > > >>>>>>>>> one or two winners in this area, and we could support them, and > > >>>>>>>>> the > > >>>>>> rest > > >>>>>>>>> would go away. In reality, there are many variations. > > >>>>>>>>> Furthermore, > > >>>>>> many > > >>>>>>>>> people still prefer to just start their processors like normal > > >>>>>>>>> Java processes, and use traditional deployment scripts such as > > >>>>>>>>> Fabric, > > >>>>>> Chef, > > >>>>>>>>> Ansible, etc. Forcing a deployment system on users makes the > > >>>>>>>>> Samza start-up process really painful for first time users. > > >>>>>>>>> > > >>>>>>>>> Dynamic deployment as a requirement was also a bit of a > > >>>>>>>>> mis-fire > > >>>>>> because > > >>>>>>>>> of > > >>>>>>>>> a fundamental misunderstanding between the nature of batch jobs > > >>>>>>>>> and > > >>>>>>> stream > > >>>>>>>>> processing jobs. Early on, we made conscious effort to favor > > >>>>>>>>> the > > >>>>>> Hadoop > > >>>>>>>>> (Map/Reduce) way of doing things, since it worked and was well > > >>>>>>> understood. > > >>>>>>>>> One thing that we missed was that batch jobs have a definite > > >>>>>> beginning, > > >>>>>>>>> and > > >>>>>>>>> end, and stream processing jobs don't (usually). This leads to > > >>>>>>>>> a > > >>>>> much > > >>>>>>>>> simpler scheduling problem for stream processors. You basically > > >>>>>>>>> just > > >>>>>>> need > > >>>>>>>>> to find a place to start the processor, and start it. The way > > >>>>>>>>> we run grids, at LinkedIn, there's no concept of a cluster > > >>>>>>>>> being "full". We always > > >>>>>> add > > >>>>>>>>> more machines. The problem with coupling Samza with a scheduler > > >>>>>>>>> is > > >>>>>> that > > >>>>>>>>> Samza (as a framework) now has to handle deployment. This pulls > > >>>>>>>>> in a > > >>>>>>> bunch > > >>>>>>>>> of things such as configuration distribution (config stream), > > >>>>>>>>> shell > > >>>>>>> scrips > > >>>>>>>>> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), > etc. > > >>>>>>>>> > > >>>>>>>>> Another reason for requiring dynamic deployment was to support > > >>>>>>>>> data locality. If you want to have locality, you need to put > > >>>>>>>>> your > > >>>>>> processors > > >>>>>>>>> close to the data they're processing. Upon further > > >>>>>>>>> investigation, > > >>>>>>> though, > > >>>>>>>>> this feature is not that beneficial. There is some good > > >>>>>>>>> discussion > > >>>>>> about > > >>>>>>>>> some problems with it on SAMZA-335. Again, we took the > > >>>>>>>>> Map/Reduce > > >>>>>> path, > > >>>>>>>>> but > > >>>>>>>>> there are some fundamental differences between HDFS and Kafka. > > >>>>>>>>> HDFS > > >>>>>> has > > >>>>>>>>> blocks, while Kafka has partitions. This leads to less > > >>>>>>>>> optimization potential with stream processors on top of Kafka. > > >>>>>>>>> > > >>>>>>>>> This feature is also used as a crutch. Samza doesn't have any > > >>>>>>>>> built > > >>>>> in > > >>>>>>>>> fault-tolerance logic. Instead, it depends on the dynamic > > >>>>>>>>> deployment scheduling system to handle restarts when a > > >>>>>>>>> processor dies. This has > > >>>>>>> made > > >>>>>>>>> it very difficult to write a standalone Samza container > > >>>> (SAMZA-516). > > >>>>>>>>> > > >>>>>>>>> Pluggability > > >>>>>>>>> > > >>>>>>>>> In some cases pluggability is good, but I think that we've gone > > >>>>>>>>> too > > >>>>>> far > > >>>>>>>>> with it. Currently, Samza has: > > >>>>>>>>> > > >>>>>>>>> * Pluggable config. > > >>>>>>>>> * Pluggable metrics. > > >>>>>>>>> * Pluggable deployment systems. > > >>>>>>>>> * Pluggable streaming systems (SystemConsumer, SystemProducer, > > >>>> etc). > > >>>>>>>>> * Pluggable serdes. > > >>>>>>>>> * Pluggable storage engines. > > >>>>>>>>> * Pluggable strategies for just about every component > > >>>>> (MessageChooser, > > >>>>>>>>> SystemStreamPartitionGrouper, ConfigRewriter, etc). > > >>>>>>>>> > > >>>>>>>>> There's probably more that I've forgotten, as well. Some of > > >>>>>>>>> these > > >>>>> are > > >>>>>>>>> useful, but some have proven not to be. This all comes at a > cost: > > >>>>>>>>> complexity. This complexity is making it harder for our users > > >>>>>>>>> to > > >>>>> pick > > >>>>>> up > > >>>>>>>>> and use Samza out of the box. It also makes it difficult for > > >>>>>>>>> Samza developers to reason about what the characteristics of > > >>>>>>>>> the container (since the characteristics change depending on > > >>>>>>>>> which plugins are use). > > >>>>>>>>> > > >>>>>>>>> The issues with pluggability are most visible in the System > APIs. > > >>>>> What > > >>>>>>>>> Samza really requires to be functional is Kafka as its > > >>>>>>>>> transport > > >>>>>> layer. > > >>>>>>>>> But > > >>>>>>>>> we've conflated two unrelated use cases into one API: > > >>>>>>>>> > > >>>>>>>>> 1. Get data into/out of Kafka. > > >>>>>>>>> 2. Process the data in Kafka. > > >>>>>>>>> > > >>>>>>>>> The current System API supports both of these use cases. The > > >>>>>>>>> problem > > >>>>>> is, > > >>>>>>>>> we > > >>>>>>>>> actually want different features for each use case. By papering > > >>>>>>>>> over > > >>>>>>> these > > >>>>>>>>> two use cases, and providing a single API, we've introduced a > > >>>>>>>>> ton of > > >>>>>>> leaky > > >>>>>>>>> abstractions. > > >>>>>>>>> > > >>>>>>>>> For example, what we'd really like in (2) is to have > > >>>>>>>>> monotonically increasing longs for offsets (like Kafka). This > > >>>>>>>>> would be at odds > > >>>>> with > > >>>>>>> (1), > > >>>>>>>>> though, since different systems have different > > >>>>>>> SCNs/Offsets/UUIDs/vectors. > > >>>>>>>>> There was discussion both on the mailing list and the SQL JIRAs > > >>>>> about > > >>>>>>> the > > >>>>>>>>> need for this. > > >>>>>>>>> > > >>>>>>>>> The same thing holds true for replayability. Kafka allows us to > > >>>>> rewind > > >>>>>>>>> when > > >>>>>>>>> we have a failure. Many other systems don't. In some cases, > > >>>>>>>>> systems > > >>>>>>> return > > >>>>>>>>> null for their offsets (e.g. WikipediaSystemConsumer) because > > >>>>>>>>> they > > >>>>>> have > > >>>>>>> no > > >>>>>>>>> offsets. > > >>>>>>>>> > > >>>>>>>>> Partitioning is another example. Kafka supports partitioning, > > >>>>>>>>> but > > >>>>> many > > >>>>>>>>> systems don't. We model this by having a single partition for > > >>>>>>>>> those systems. Still, other systems model partitioning > > >>>> differently (e.g. > > >>>>>>>>> Kinesis). > > >>>>>>>>> > > >>>>>>>>> The SystemAdmin interface is also a mess. Creating streams in a > > >>>>>>>>> system-agnostic way is almost impossible. As is modeling > > >>>>>>>>> metadata > > >>>>> for > > >>>>>>> the > > >>>>>>>>> system (replication factor, partitions, location, etc). The > > >>>>>>>>> list > > >>>>> goes > > >>>>>>> on. > > >>>>>>>>> > > >>>>>>>>> Duplicate work > > >>>>>>>>> > > >>>>>>>>> At the time that we began writing Samza, Kafka's consumer and > > >>>>> producer > > >>>>>>>>> APIs > > >>>>>>>>> had a relatively weak feature set. On the consumer-side, you > > >>>>>>>>> had two > > >>>>>>>>> options: use the high level consumer, or the simple consumer. > > >>>>>>>>> The > > >>>>>>> problem > > >>>>>>>>> with the high-level consumer was that it controlled your > > >>>>>>>>> offsets, partition assignments, and the order in which you > > >>>>>>>>> received messages. The > > >>>>> problem > > >>>>>>>>> with > > >>>>>>>>> the simple consumer is that it's not simple. It's basic. You > > >>>>>>>>> end up > > >>>>>>> having > > >>>>>>>>> to handle a lot of really low-level stuff that you shouldn't. > > >>>>>>>>> We > > >>>>>> spent a > > >>>>>>>>> lot of time to make Samza's KafkaSystemConsumer very robust. It > > >>>>>>>>> also allows us to support some cool features: > > >>>>>>>>> > > >>>>>>>>> * Per-partition message ordering and prioritization. > > >>>>>>>>> * Tight control over partition assignment to support joins, > > >>>>>>>>> global > > >>>>>> state > > >>>>>>>>> (if we want to implement it :)), etc. > > >>>>>>>>> * Tight control over offset checkpointing. > > >>>>>>>>> > > >>>>>>>>> What we didn't realize at the time is that these features > > >>>>>>>>> should > > >>>>>>> actually > > >>>>>>>>> be in Kafka. A lot of Kafka consumers (not just Samza stream > > >>>>>> processors) > > >>>>>>>>> end up wanting to do things like joins and partition > > >>>>>>>>> assignment. The > > >>>>>>> Kafka > > >>>>>>>>> community has come to the same conclusion. They're adding a ton > > >>>>>>>>> of upgrades into their new Kafka consumer implementation. To a > > >>>>>>>>> large extent, > > >>>>> it's > > >>>>>>>>> duplicate work to what we've already done in Samza. > > >>>>>>>>> > > >>>>>>>>> On top of this, Kafka ended up taking a very similar approach > > >>>>>>>>> to > > >>>>>> Samza's > > >>>>>>>>> KafkaCheckpointManager implementation for handling offset > > >>>>>> checkpointing. > > >>>>>>>>> Like Samza, Kafka's new offset management feature stores offset > > >>>>>>>>> checkpoints in a topic, and allows you to fetch them from the > > >>>>>>>>> broker. > > >>>>>>>>> > > >>>>>>>>> A lot of this seems like a waste, since we could have shared > > >>>>>>>>> the > > >>>>> work > > >>>>>> if > > >>>>>>>>> it > > >>>>>>>>> had been done in Kafka from the get-go. > > >>>>>>>>> > > >>>>>>>>> Vision > > >>>>>>>>> > > >>>>>>>>> All of this leads me to a rather radical proposal. Samza is > > >>>>> relatively > > >>>>>>>>> stable at this point. I'd venture to say that we're near a 1.0 > > >>>>>> release. > > >>>>>>>>> I'd > > >>>>>>>>> like to propose that we take what we've learned, and begin > > >>>>>>>>> thinking > > >>>>>>> about > > >>>>>>>>> Samza beyond 1.0. What would we change if we were starting from > > >>>>>> scratch? > > >>>>>>>>> My > > >>>>>>>>> proposal is to: > > >>>>>>>>> > > >>>>>>>>> 1. Make Samza standalone the *only* way to run Samza > > >>>>>>>>> processors, and eliminate all direct dependences on YARN, > Mesos, > > >>>> etc. > > >>>>>>>>> 2. Make a definitive call to support only Kafka as the stream > > >>>>>> processing > > >>>>>>>>> layer. > > >>>>>>>>> 3. Eliminate Samza's metrics, logging, serialization, and > > >>>>>>>>> config > > >>>>>>> systems, > > >>>>>>>>> and simply use Kafka's instead. > > >>>>>>>>> > > >>>>>>>>> This would fix all of the issues that I outlined above. It > > >>>>>>>>> should > > >>>>> also > > >>>>>>>>> shrink the Samza code base pretty dramatically. Supporting only > > >>>>>>>>> a standalone container will allow Samza to be executed on YARN > > >>>>>>>>> (using Slider), Mesos (using Marathon/Aurora), or most other > > >>>>>>>>> in-house > > >>>>>>> deployment > > >>>>>>>>> systems. This should make life a lot easier for new users. > > >>>>>>>>> Imagine > > >>>>>>> having > > >>>>>>>>> the hello-samza tutorial without YARN. The drop in mailing list > > >>>>>> traffic > > >>>>>>>>> will be pretty dramatic. > > >>>>>>>>> > > >>>>>>>>> Coupling with Kafka seems long overdue to me. The reality is, > > >>>>> everyone > > >>>>>>>>> that > > >>>>>>>>> I'm aware of is using Samza with Kafka. We basically require it > > >>>>>> already > > >>>>>>> in > > >>>>>>>>> order for most features to work. Those that are using other > > >>>>>>>>> systems > > >>>>>> are > > >>>>>>>>> generally using it for ingest into Kafka (1), and then they do > > >>>>>>>>> the processing on top. There is already discussion ( > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851 > > >>>>> 767 > > >>>>>>>>> ) > > >>>>>>>>> in Kafka to make ingesting into Kafka extremely easy. > > >>>>>>>>> > > >>>>>>>>> Once we make the call to couple with Kafka, we can leverage a > > >>>>>>>>> ton of > > >>>>>>> their > > >>>>>>>>> ecosystem. We no longer have to maintain our own config, > > >>>>>>>>> metrics, > > >>>>> etc. > > >>>>>>> We > > >>>>>>>>> can all share the same libraries, and make them better. This > > >>>>>>>>> will > > >>>>> also > > >>>>>>>>> allow us to share the consumer/producer APIs, and will let us > > >>>>> leverage > > >>>>>>>>> their offset management and partition management, rather than > > >>>>>>>>> having > > >>>>>> our > > >>>>>>>>> own. All of the coordinator stream code would go away, as would > > >>>>>>>>> most > > >>>>>> of > > >>>>>>>>> the > > >>>>>>>>> YARN AppMaster code. We'd probably have to push some partition > > >>>>>>> management > > >>>>>>>>> features into the Kafka broker, but they're already moving in > > >>>>>>>>> that direction with the new consumer API. The features we have > > >>>>>>>>> for > > >>>>>> partition > > >>>>>>>>> assignment aren't unique to Samza, and seem like they should be > > >>>>>>>>> in > > >>>>>> Kafka > > >>>>>>>>> anyway. There will always be some niche usages which will > > >>>>>>>>> require > > >>>>>> extra > > >>>>>>>>> care and hence full control over partition assignments much > > >>>>>>>>> like the > > >>>>>>> Kafka > > >>>>>>>>> low level consumer api. These would continue to be supported. > > >>>>>>>>> > > >>>>>>>>> These items will be good for the Samza community. They'll make > > >>>>>>>>> Samza easier to use, and make it easier for developers to add > > >>>>>>>>> new features. > > >>>>>>>>> > > >>>>>>>>> Obviously this is a fairly large (and somewhat backwards > > >>>>> incompatible > > >>>>>>>>> change). If we choose to go this route, it's important that we > > >>>>> openly > > >>>>>>>>> communicate how we're going to provide a migration path from > > >>>>>>>>> the > > >>>>>>> existing > > >>>>>>>>> APIs to the new ones (if we make incompatible changes). I think > > >>>>>>>>> at a minimum, we'd probably need to provide a wrapper to allow > > >>>>>>>>> existing StreamTask implementations to continue running on the > > >>>> new container. > > >>>>>>> It's > > >>>>>>>>> also important that we openly communicate about timing, and > > >>>>>>>>> stages > > >>>>> of > > >>>>>>> the > > >>>>>>>>> migration. > > >>>>>>>>> > > >>>>>>>>> If you made it this far, I'm sure you have opinions. :) Please > > >>>>>>>>> send > > >>>>>> your > > >>>>>>>>> thoughts and feedback. > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> Chris > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> -- Guozhang > > >>>>>> > > >>>>> > > >>>> > > >> > > >> > > > > > > >