Hey Yan, I think Chris and I are proposing the same thing. I not really saying that we should literally make Samza a Kafka client, but rather that philosophically what we want to have is closer to a fancy client than it is to map/reduce (but current samza is the reverse).
To answer your questions, both work exactly as the proposed standalone mode works. The key observation is that the new Kafka consumer support on the server provides what is basically a much more operationally sound version of the standalone functionality we were going to bake into the container. The idea is that if you are kafka-specific you can just use that. So to answer your specific questions: 1. In both Chris and my proposal Samza gets out of the business of deploying and starting processes. The rationale for this is that there are soooo many ways of doing this well, and Samza currently forces a way that isn't too great. Basically if you want to package your job in docker and deploy with Mesos, fine. If you want to package it as a simple command line program and start it with puppet/chef/salt/whatever, fine too. 2. However fault-tolerance remains. What this means is that although the starting of processes is external all the processes work to balance processing amongst themselves (just as in the proposed standalone mode). All the state management stuff is preserved as is and is part of the lifecycle of the tasks. To give a concrete example if you start a single process it will consume all partitions and have all the tasks, as you start more processes they will take over partitions, if some of them fail their partitions will be given back to the remaining processes. Shared state (assuming you are referring to broadcast topics?) is totally orthogonal I think and would change with this proposal other than that the partition assignment mechanism would have to have a "broadcast" assignment strategy. Either way I think the key idea is to kind of embrace Kafka--both in naming, conventions, config, monitoring, etc so that you just have to master one way of doing these things rather than having two layers. -Jay On Wed, Jul 1, 2015 at 10:59 PM, Yan Fang <yanfang...@gmail.com> wrote: > Overall, I agree to couple with Kafka more tightly. Because Samza de facto > is based on Kafka, and it should leverage what Kafka has. At the same time, > Kafka does not need to reinvent what Samza already has. I also like the > idea of separating the ingestion and transformation. > > But it is a little difficult for me to image how the Samza will look like. > And I feel Chris and Jay have a little difference in terms of how Samza > should look like. > > *** Will it look like what Jay's code shows (A client of Kakfa) ? And > user's application code calls this client? > > 1. If we make Samza be a library of Kafka (like what the code shows), how > do we implement auto-balance and fault-tolerance? Are they taken care by > the Kafka broker or other mechanism, such as "Samza worker" (just make up > the name) ? > > 2. What about other features, such as auto-scaling, shared state, > monitoring? > > > *** If we have Samza standalone, (is this what Chris suggests?) > > 1. we still need to ingest data from Kakfa and produce to it. Then it > becomes the same as what Samza looks like now, except it does not rely on > Yarn anymore. > > 2. if it is standalone, how can it leverage Kafka's metrics, logs, etc? Use > Kafka code as the dependency? > > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Read through the code example and it looks good to me. A few thoughts > > regarding deployment: > > > > Today Samza deploys as executable runnable like: > > > > deploy/samza/bin/run-job.sh --config-factory=... --config-path=file://... > > > > And this proposal advocate for deploying Samza more as embedded libraries > > in user application code (ignoring the terminology since it is not the > same > > as the prototype code): > > > > StreamTask task = new MyStreamTask(configs); > > Thread thread = new Thread(task); > > thread.start(); > > > > I think both of these deployment modes are important for different types > of > > users. That said, I think making Samza purely standalone is still > > sufficient for either runnable or library modes. > > > > Guozhang > > > > On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > Looks like gmail mangled the code example, it was supposed to look like > > > this: > > > > > > Properties props = new Properties(); > > > props.put("bootstrap.servers", "localhost:4242"); > > > StreamingConfig config = new StreamingConfig(props); > > > config.subscribe("test-topic-1", "test-topic-2"); > > > config.processor(ExampleStreamProcessor.class); > > > config.serialization(new StringSerializer(), new StringDeserializer()); > > > KafkaStreaming container = new KafkaStreaming(config); > > > container.run(); > > > > > > -Jay > > > > > > On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > Hey guys, > > > > > > > > This came out of some conversations Chris and I were having around > > > whether > > > > it would make sense to use Samza as a kind of data ingestion > framework > > > for > > > > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of > > combined > > > > with complaints around config and YARN and the discussion around how > to > > > > best do a standalone mode. > > > > > > > > So the thought experiment was, given that Samza was basically already > > > > totally Kafka specific, what if you just embraced that and turned it > > into > > > > something less like a heavyweight framework and more like a third > Kafka > > > > client--a kind of "producing consumer" with state management > > facilities. > > > > Basically a library. Instead of a complex stream processing framework > > > this > > > > would actually be a very simple thing, not much more complicated to > use > > > or > > > > operate than a Kafka consumer. As Chris said we thought about it a > lot > > of > > > > what Samza (and the other stream processing systems were doing) > seemed > > > like > > > > kind of a hangover from MapReduce. > > > > > > > > Of course you need to ingest/output data to and from the stream > > > > processing. But when we actually looked into how that would work, > Samza > > > > isn't really an ideal data ingestion framework for a bunch of > reasons. > > To > > > > really do that right you need a pretty different internal data model > > and > > > > set of apis. So what if you split them and had an api for Kafka > > > > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka > > > > transformation (Samza). > > > > > > > > This would also allow really embracing the same terminology and > > > > conventions. One complaint about the current state is that the two > > > systems > > > > kind of feel bolted on. Terminology like "stream" vs "topic" and > > > different > > > > config and monitoring systems means you kind of have to learn Kafka's > > > way, > > > > then learn Samza's slightly different way, then kind of understand > how > > > they > > > > map to each other, which having walked a few people through this is > > > > surprisingly tricky for folks to get. > > > > > > > > Since I have been spending a lot of time on airplanes I hacked up an > > > > ernest but still somewhat incomplete prototype of what this would > look > > > > like. This is just unceremoniously dumped into Kafka as it required a > > few > > > > changes to the new consumer. Here is the code: > > > > > > > > > > > > > > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming > > > > > > > > For the purpose of the prototype I just liberally renamed everything > to > > > > try to align it with Kafka with no regard for compatibility. > > > > > > > > To use this would be something like this: > > > > Properties props = new Properties(); props.put("bootstrap.servers", > > > > "localhost:4242"); StreamingConfig config = new > StreamingConfig(props); > > > config.subscribe("test-topic-1", > > > > "test-topic-2"); config.processor(ExampleStreamProcessor.class); > > > config.serialization(new > > > > StringSerializer(), new StringDeserializer()); KafkaStreaming > > container = > > > > new KafkaStreaming(config); container.run(); > > > > > > > > KafkaStreaming is basically the SamzaContainer; StreamProcessor is > > > > basically StreamTask. > > > > > > > > So rather than putting all the class names in a file and then having > > the > > > > job assembled by reflection, you just instantiate the container > > > > programmatically. Work is balanced over however many instances of > this > > > are > > > > alive at any time (i.e. if an instance dies, new tasks are added to > the > > > > existing containers without shutting them down). > > > > > > > > We would provide some glue for running this stuff in YARN via Slider, > > > > Mesos via Marathon, and AWS using some of their tools but from the > > point > > > of > > > > view of these frameworks these stream processing jobs are just > > stateless > > > > services that can come and go and expand and contract at will. There > is > > > no > > > > more custom scheduler. > > > > > > > > Here are some relevant details: > > > > > > > > 1. It is only ~1300 lines of code, it would get larger if we > > > > productionized but not vastly larger. We really do get a ton of > > > leverage > > > > out of Kafka. > > > > 2. Partition management is fully delegated to the new consumer. > This > > > > is nice since now any partition management strategy available to > > Kafka > > > > consumer is also available to Samza (and vice versa) and with the > > > exact > > > > same configs. > > > > 3. It supports state as well as state reuse > > > > > > > > Anyhow take a look, hopefully it is thought provoking. > > > > > > > > -Jay > > > > > > > > > > > > > > > > On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini < > > criccom...@apache.org> > > > > wrote: > > > > > > > >> Hey all, > > > >> > > > >> I have had some discussions with Samza engineers at LinkedIn and > > > Confluent > > > >> and we came up with a few observations and would like to propose > some > > > >> changes. > > > >> > > > >> We've observed some things that I want to call out about Samza's > > design, > > > >> and I'd like to propose some changes. > > > >> > > > >> * Samza is dependent upon a dynamic deployment system. > > > >> * Samza is too pluggable. > > > >> * Samza's SystemConsumer/SystemProducer and Kafka's consumer APIs > are > > > >> trying to solve a lot of the same problems. > > > >> > > > >> All three of these issues are related, but I'll address them in > order. > > > >> > > > >> Deployment > > > >> > > > >> Samza strongly depends on the use of a dynamic deployment scheduler > > such > > > >> as > > > >> YARN, Mesos, etc. When we initially built Samza, we bet that there > > would > > > >> be > > > >> one or two winners in this area, and we could support them, and the > > rest > > > >> would go away. In reality, there are many variations. Furthermore, > > many > > > >> people still prefer to just start their processors like normal Java > > > >> processes, and use traditional deployment scripts such as Fabric, > > Chef, > > > >> Ansible, etc. Forcing a deployment system on users makes the Samza > > > >> start-up > > > >> process really painful for first time users. > > > >> > > > >> Dynamic deployment as a requirement was also a bit of a mis-fire > > because > > > >> of > > > >> a fundamental misunderstanding between the nature of batch jobs and > > > stream > > > >> processing jobs. Early on, we made conscious effort to favor the > > Hadoop > > > >> (Map/Reduce) way of doing things, since it worked and was well > > > understood. > > > >> One thing that we missed was that batch jobs have a definite > > beginning, > > > >> and > > > >> end, and stream processing jobs don't (usually). This leads to a > much > > > >> simpler scheduling problem for stream processors. You basically just > > > need > > > >> to find a place to start the processor, and start it. The way we run > > > >> grids, > > > >> at LinkedIn, there's no concept of a cluster being "full". We always > > add > > > >> more machines. The problem with coupling Samza with a scheduler is > > that > > > >> Samza (as a framework) now has to handle deployment. This pulls in a > > > bunch > > > >> of things such as configuration distribution (config stream), shell > > > scrips > > > >> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc. > > > >> > > > >> Another reason for requiring dynamic deployment was to support data > > > >> locality. If you want to have locality, you need to put your > > processors > > > >> close to the data they're processing. Upon further investigation, > > > though, > > > >> this feature is not that beneficial. There is some good discussion > > about > > > >> some problems with it on SAMZA-335. Again, we took the Map/Reduce > > path, > > > >> but > > > >> there are some fundamental differences between HDFS and Kafka. HDFS > > has > > > >> blocks, while Kafka has partitions. This leads to less optimization > > > >> potential with stream processors on top of Kafka. > > > >> > > > >> This feature is also used as a crutch. Samza doesn't have any built > in > > > >> fault-tolerance logic. Instead, it depends on the dynamic deployment > > > >> scheduling system to handle restarts when a processor dies. This has > > > made > > > >> it very difficult to write a standalone Samza container (SAMZA-516). > > > >> > > > >> Pluggability > > > >> > > > >> In some cases pluggability is good, but I think that we've gone too > > far > > > >> with it. Currently, Samza has: > > > >> > > > >> * Pluggable config. > > > >> * Pluggable metrics. > > > >> * Pluggable deployment systems. > > > >> * Pluggable streaming systems (SystemConsumer, SystemProducer, etc). > > > >> * Pluggable serdes. > > > >> * Pluggable storage engines. > > > >> * Pluggable strategies for just about every component > (MessageChooser, > > > >> SystemStreamPartitionGrouper, ConfigRewriter, etc). > > > >> > > > >> There's probably more that I've forgotten, as well. Some of these > are > > > >> useful, but some have proven not to be. This all comes at a cost: > > > >> complexity. This complexity is making it harder for our users to > pick > > up > > > >> and use Samza out of the box. It also makes it difficult for Samza > > > >> developers to reason about what the characteristics of the container > > > >> (since > > > >> the characteristics change depending on which plugins are use). > > > >> > > > >> The issues with pluggability are most visible in the System APIs. > What > > > >> Samza really requires to be functional is Kafka as its transport > > layer. > > > >> But > > > >> we've conflated two unrelated use cases into one API: > > > >> > > > >> 1. Get data into/out of Kafka. > > > >> 2. Process the data in Kafka. > > > >> > > > >> The current System API supports both of these use cases. The problem > > is, > > > >> we > > > >> actually want different features for each use case. By papering over > > > these > > > >> two use cases, and providing a single API, we've introduced a ton of > > > leaky > > > >> abstractions. > > > >> > > > >> For example, what we'd really like in (2) is to have monotonically > > > >> increasing longs for offsets (like Kafka). This would be at odds > with > > > (1), > > > >> though, since different systems have different > > > SCNs/Offsets/UUIDs/vectors. > > > >> There was discussion both on the mailing list and the SQL JIRAs > about > > > the > > > >> need for this. > > > >> > > > >> The same thing holds true for replayability. Kafka allows us to > rewind > > > >> when > > > >> we have a failure. Many other systems don't. In some cases, systems > > > return > > > >> null for their offsets (e.g. WikipediaSystemConsumer) because they > > have > > > no > > > >> offsets. > > > >> > > > >> Partitioning is another example. Kafka supports partitioning, but > many > > > >> systems don't. We model this by having a single partition for those > > > >> systems. Still, other systems model partitioning differently (e.g. > > > >> Kinesis). > > > >> > > > >> The SystemAdmin interface is also a mess. Creating streams in a > > > >> system-agnostic way is almost impossible. As is modeling metadata > for > > > the > > > >> system (replication factor, partitions, location, etc). The list > goes > > > on. > > > >> > > > >> Duplicate work > > > >> > > > >> At the time that we began writing Samza, Kafka's consumer and > producer > > > >> APIs > > > >> had a relatively weak feature set. On the consumer-side, you had two > > > >> options: use the high level consumer, or the simple consumer. The > > > problem > > > >> with the high-level consumer was that it controlled your offsets, > > > >> partition > > > >> assignments, and the order in which you received messages. The > problem > > > >> with > > > >> the simple consumer is that it's not simple. It's basic. You end up > > > having > > > >> to handle a lot of really low-level stuff that you shouldn't. We > > spent a > > > >> lot of time to make Samza's KafkaSystemConsumer very robust. It also > > > >> allows > > > >> us to support some cool features: > > > >> > > > >> * Per-partition message ordering and prioritization. > > > >> * Tight control over partition assignment to support joins, global > > state > > > >> (if we want to implement it :)), etc. > > > >> * Tight control over offset checkpointing. > > > >> > > > >> What we didn't realize at the time is that these features should > > > actually > > > >> be in Kafka. A lot of Kafka consumers (not just Samza stream > > processors) > > > >> end up wanting to do things like joins and partition assignment. The > > > Kafka > > > >> community has come to the same conclusion. They're adding a ton of > > > >> upgrades > > > >> into their new Kafka consumer implementation. To a large extent, > it's > > > >> duplicate work to what we've already done in Samza. > > > >> > > > >> On top of this, Kafka ended up taking a very similar approach to > > Samza's > > > >> KafkaCheckpointManager implementation for handling offset > > checkpointing. > > > >> Like Samza, Kafka's new offset management feature stores offset > > > >> checkpoints > > > >> in a topic, and allows you to fetch them from the broker. > > > >> > > > >> A lot of this seems like a waste, since we could have shared the > work > > if > > > >> it > > > >> had been done in Kafka from the get-go. > > > >> > > > >> Vision > > > >> > > > >> All of this leads me to a rather radical proposal. Samza is > relatively > > > >> stable at this point. I'd venture to say that we're near a 1.0 > > release. > > > >> I'd > > > >> like to propose that we take what we've learned, and begin thinking > > > about > > > >> Samza beyond 1.0. What would we change if we were starting from > > scratch? > > > >> My > > > >> proposal is to: > > > >> > > > >> 1. Make Samza standalone the *only* way to run Samza processors, and > > > >> eliminate all direct dependences on YARN, Mesos, etc. > > > >> 2. Make a definitive call to support only Kafka as the stream > > processing > > > >> layer. > > > >> 3. Eliminate Samza's metrics, logging, serialization, and config > > > systems, > > > >> and simply use Kafka's instead. > > > >> > > > >> This would fix all of the issues that I outlined above. It should > also > > > >> shrink the Samza code base pretty dramatically. Supporting only a > > > >> standalone container will allow Samza to be executed on YARN (using > > > >> Slider), Mesos (using Marathon/Aurora), or most other in-house > > > deployment > > > >> systems. This should make life a lot easier for new users. Imagine > > > having > > > >> the hello-samza tutorial without YARN. The drop in mailing list > > traffic > > > >> will be pretty dramatic. > > > >> > > > >> Coupling with Kafka seems long overdue to me. The reality is, > everyone > > > >> that > > > >> I'm aware of is using Samza with Kafka. We basically require it > > already > > > in > > > >> order for most features to work. Those that are using other systems > > are > > > >> generally using it for ingest into Kafka (1), and then they do the > > > >> processing on top. There is already discussion ( > > > >> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 > > > >> ) > > > >> in Kafka to make ingesting into Kafka extremely easy. > > > >> > > > >> Once we make the call to couple with Kafka, we can leverage a ton of > > > their > > > >> ecosystem. We no longer have to maintain our own config, metrics, > etc. > > > We > > > >> can all share the same libraries, and make them better. This will > also > > > >> allow us to share the consumer/producer APIs, and will let us > leverage > > > >> their offset management and partition management, rather than having > > our > > > >> own. All of the coordinator stream code would go away, as would most > > of > > > >> the > > > >> YARN AppMaster code. We'd probably have to push some partition > > > management > > > >> features into the Kafka broker, but they're already moving in that > > > >> direction with the new consumer API. The features we have for > > partition > > > >> assignment aren't unique to Samza, and seem like they should be in > > Kafka > > > >> anyway. There will always be some niche usages which will require > > extra > > > >> care and hence full control over partition assignments much like the > > > Kafka > > > >> low level consumer api. These would continue to be supported. > > > >> > > > >> These items will be good for the Samza community. They'll make Samza > > > >> easier > > > >> to use, and make it easier for developers to add new features. > > > >> > > > >> Obviously this is a fairly large (and somewhat backwards > incompatible > > > >> change). If we choose to go this route, it's important that we > openly > > > >> communicate how we're going to provide a migration path from the > > > existing > > > >> APIs to the new ones (if we make incompatible changes). I think at a > > > >> minimum, we'd probably need to provide a wrapper to allow existing > > > >> StreamTask implementations to continue running on the new container. > > > It's > > > >> also important that we openly communicate about timing, and stages > of > > > the > > > >> migration. > > > >> > > > >> If you made it this far, I'm sure you have opinions. :) Please send > > your > > > >> thoughts and feedback. > > > >> > > > >> Cheers, > > > >> Chris > > > >> > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >