@Guozhang, yes, that's what I meant. From Kafka consumers' point of view, it pretty much boils down to answer the following question: 1. For Kafka consumer in each container (i.e. a Samza worker), which topic partitions it should consume. Samza's current StreamTask model still makes sense to me and the partition-to-task mapping is the debatable point that whether that should be in Kafka or stays in a separate module. As we discussed earlier, some simple partition-to-task mapping maybe expressed as co-partition distribution among different topics in Kafka (forgive me if I had make mistakes here since I am not 100% sure about how partition distribution policies work in Kafka). However, more complex and application-specific partition-to-task mapping would need to stay outside of Kafka. One example is the discussion on SQL tasks: https://issues.apache.org/jira/browse/SAMZA-516?focusedCommentId=14288685&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14288685
On Thu, Jul 2, 2015 at 2:47 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Since the resource scheduling systems like YARN / Mesos only gives Samza a > couple of resource units (or "containers") to run processes, while Samza > still needs to handle task assignment / scheduling like which tasks should > be allocated to which containers that consume from which partitions, etc. I > think this is want Yi meant for "partition management"? > > On Thu, Jul 2, 2015 at 2:35 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > @Jay, yes, the current function in the JobCoordinator is just partition > > management. Maybe we should just call it PartitionManager to make it > > explicit. > > > > -Yi > > > > On Thu, Jul 2, 2015 at 2:24 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > Hey Yi, > > > > > > What does the JobCoordinator do? YARN/Mesos/etc would be doing the > actual > > > resource assignment, process restart, etc, right? Is the additional > value > > > add of the JobCoordinator just partition management? > > > > > > -Jay > > > > > > On Thu, Jul 2, 2015 at 11:32 AM, Yi Pan <nickpa...@gmail.com> wrote: > > > > > > > Hi, all, > > > > > > > > > > > > Thanks Chris for sending out this proposal and Jay for sharing the > > > > extremely illustrative prototype code. > > > > > > > > > > > > I have been thinking it over many times and want to list out my > > personal > > > > opinions below: > > > > > > > > 1. Generally, I agree with most of the people here on the mailing > list > > on > > > > two points: > > > > > > > > a. Deeper integration w/ Kafka is great. No more confusing mapping > > > from > > > > SystemStreamPartition to TopicPartition etc. > > > > > > > > b. Separation the ingestion vs transformation greatly simplify the > > > > systems APIs > > > > > > > > Having the above two changes would allow us to remove many > unnecessary > > > > complexities introduced by those pluggable interfaces Chris’ pointed > > out, > > > > e.g. pluggable streaming systems and serde. > > > > > > > > > > > > To recall one of Chris’s statement on difficulties in dynamic > > > deployment, I > > > > believe that the difficulties are mainly the result of tight-coupling > > of > > > > partition assignment vs the container deployment in the current > system. > > > The > > > > current container deployment requires a pre-defined partition > > assignment > > > > strategy coupled together w/ the deployment configuration before we > can > > > > submit to YARN and start the Samza container, which makes the > launching > > > > process super long. Also, fault-tolerance and the embedded > > JobCoordinator > > > > code in YARN AppMaster is another way of making dynamic deployment > > more > > > > complex and difficult. > > > > > > > > > > > > First, borrowing Yan’s term, let’s call the Samza standalone process > a > > > > Samza worker. Here is what I have been thinking: > > > > > > > > 1. Separate the execution framework from partition assignment/load > > > > balancing: > > > > > > > > a. a Samza worker should be launched by execution framework that > > only > > > > deals w/ process placement to available nodes. The execution > framework > > > now > > > > should only deal w/ how many such processes are needed, where to put > > > them, > > > > and how to keep them alive. > > > > > > > > b. Partition assignment/load balancing can be a pluggable > interface > > > in > > > > Samza that allows the Samza workers to ask for partition assignments. > > > Let’s > > > > borrow the name JobCoordinator for now. To allow fault-tolerance in > > case > > > of > > > > failure, the partition assignments to workers need to be dynamic. > > Hence, > > > > the abstract interface would be much like what Jay’s code illustrate: > > > > get()/onAssigned()/onRevoke(). The implementation of the partition > > > > assignment can be either: > > > > > > > > a) completely rely on Kafka. > > > > > > > > b) explicit partition assignment via JobCoordinator. Chris’s > > work > > > > in SAMZA-516 can be easily incorporated here. The use case in > SAMZA-41 > > > that > > > > runs Samza ProcessJob w/ static partition assignment can be > implemented > > > of > > > > JobCoordinator via any home-grown implementation of distributed > > > > coordinator. All the work we did in LinkedIn to support dynamic > > partition > > > > assignment and host-affinity SAMZA-617 can be nicely reused as an > > > > implementation of JobCoordinator. > > > > > > > > > > > > When we did the above work, I can see three usage patterns in Samza: > > > > > > > > a. Samza as a library: Samza has a set of libraries to provide > > stream > > > > processing, just like a third Kafka client (as illustrated in Jay’s > > > > example). The execution/deployment is totally controlled by the > > > application > > > > and the partition coordination is done via Kafka > > > > > > > > b. Samza as a process: Samza runs as a standalone process. There > may > > > not > > > > be a execution framework to launch and deploy Samza processes. The > > > > partition assignment is pluggable JobCoordinator. > > > > > > > > c. Samza as a service: Samza runs as a collection of processes. > > There > > > > will be an execution framework to allocate resource, launch and > deploy > > > > Samza workers and keep them alive. The same pluggable JobCoordinator > is > > > > desirable here as well. > > > > > > > > > > > > Lastly, I would argue that CopyCat in KIP-26 should probably follow > the > > > > same model. Hence, in Samza as a service model as in LinkedIn, we can > > use > > > > the same fault tolerance execution framework to run CopyCat and Samza > > w/o > > > > the need to operate two service platforms, which should address > > Sriram’s > > > > comment in the email thread. > > > > > > > > > > > > Hope the above makes sense. Thanks all! > > > > > > > > > > > > -Yi > > > > > > > > On Thu, Jul 2, 2015 at 9:53 AM, Sriram <sriram....@gmail.com> wrote: > > > > > > > > > One thing that is worth exploring is to have a transformation and > > > > > ingestion library in Kafka but use the same framework for fault > > > > tolerance, > > > > > resource isolation and management. The biggest difference I see in > > > these > > > > > two use cases is the API and data model. > > > > > > > > > > > > > > > > On Jul 2, 2015, at 8:59 AM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > > > > > Hey Garry, > > > > > > > > > > > > Yeah that's super frustrating. I'd be happy to chat more about > this > > > if > > > > > > you'd be interested. I think Chris and I started with the idea of > > > "what > > > > > > would it take to make Samza a kick-ass ingestion tool" but > > ultimately > > > > we > > > > > > kind of came around to the idea that ingestion and transformation > > had > > > > > > pretty different needs and coupling the two made things hard. > > > > > > > > > > > > For what it's worth I think copycat (KIP-26) actually will do > what > > > you > > > > > are > > > > > > looking for. > > > > > > > > > > > > With regard to your point about slider, I don't necessarily > > disagree. > > > > > But I > > > > > > think getting good YARN support is quite doable and I think we > can > > > make > > > > > > that work well. I think the issue this proposal solves is that > > > > > technically > > > > > > it is pretty hard to support multiple cluster management systems > > the > > > > way > > > > > > things are now, you need to write an "app master" or "framework" > > for > > > > each > > > > > > and they are all a little different so testing is really hard. In > > the > > > > > > absence of this we have been stuck with just YARN which has > > fantastic > > > > > > penetration in the Hadoopy part of the org, but zero penetration > > > > > elsewhere. > > > > > > Given the huge amount of work being put in to slider, marathon, > aws > > > > > > tooling, not to mention the umpteen related packaging > technologies > > > > people > > > > > > want to use (Docker, Kubernetes, various cloud-specific deploy > > tools, > > > > > etc) > > > > > > I really think it is important to get this right. > > > > > > > > > > > > -Jay > > > > > > > > > > > > On Thu, Jul 2, 2015 at 4:17 AM, Garry Turkington < > > > > > > g.turking...@improvedigital.com> wrote: > > > > > > > > > > > >> Hi all, > > > > > >> > > > > > >> I think the question below re does Samza become a sub-project of > > > Kafka > > > > > >> highlights the broader point around migration. Chris mentions > > > Samza's > > > > > >> maturity is heading towards a v1 release but I'm not sure it > feels > > > > > right to > > > > > >> launch a v1 then immediately plan to deprecate most of it. > > > > > >> > > > > > >> From a selfish perspective I have some guys who have started > > working > > > > > with > > > > > >> Samza and building some new consumers/producers was next up. > > Sounds > > > > like > > > > > >> that is absolutely not the direction to go. I need to look into > > the > > > > KIP > > > > > in > > > > > >> more detail but for me the attractiveness of adding new Samza > > > > > >> consumer/producers -- even if yes all they were doing was really > > > > getting > > > > > >> data into and out of Kafka -- was to avoid having to worry > about > > > the > > > > > >> lifecycle management of external clients. If there is a generic > > > Kafka > > > > > >> ingress/egress layer that I can plug a new connector into and > > have a > > > > > lot of > > > > > >> the heavy lifting re scale and reliability done for me then it > > gives > > > > me > > > > > all > > > > > >> the pushing new consumers/producers would. If not then it > > > complicates > > > > my > > > > > >> operational deployments. > > > > > >> > > > > > >> Which is similar to my other question with the proposal -- if we > > > > build a > > > > > >> fully available/stand-alone Samza plus the requisite shims to > > > > integrate > > > > > >> with Slider etc I suspect the former may be a lot more work than > > we > > > > > think. > > > > > >> We may make it much easier for a newcomer to get something > running > > > but > > > > > >> having them step up and get a reliable production deployment may > > > still > > > > > >> dominate mailing list traffic, if for different reasons than > > today. > > > > > >> > > > > > >> Don't get me wrong -- I'm comfortable with making the Samza > > > dependency > > > > > on > > > > > >> Kafka much more explicit and I absolutely see the benefits in > the > > > > > >> reduction of duplication and clashing terminologies/abstractions > > > that > > > > > >> Chris/Jay describe. Samza as a library would likely be a very > nice > > > > tool > > > > > to > > > > > >> add to the Kafka ecosystem. I just have the concerns above re > the > > > > > >> operational side. > > > > > >> > > > > > >> Garry > > > > > >> > > > > > >> -----Original Message----- > > > > > >> From: Gianmarco De Francisci Morales [mailto:g...@apache.org] > > > > > >> Sent: 02 July 2015 12:56 > > > > > >> To: dev@samza.apache.org > > > > > >> Subject: Re: Thoughts and obesrvations on Samza > > > > > >> > > > > > >> Very interesting thoughts. > > > > > >> From outside, I have always perceived Samza as a computing layer > > > over > > > > > >> Kafka. > > > > > >> > > > > > >> The question, maybe a bit provocative, is "should Samza be a > > > > sub-project > > > > > >> of Kafka then?" > > > > > >> Or does it make sense to keep it as a separate project with a > > > separate > > > > > >> governance? > > > > > >> > > > > > >> Cheers, > > > > > >> > > > > > >> -- > > > > > >> Gianmarco > > > > > >> > > > > > >>> On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> > wrote: > > > > > >>> > > > > > >>> Overall, I agree to couple with Kafka more tightly. Because > Samza > > > de > > > > > >>> facto is based on Kafka, and it should leverage what Kafka has. > > At > > > > the > > > > > >>> same time, Kafka does not need to reinvent what Samza already > > has. > > > I > > > > > >>> also like the idea of separating the ingestion and > > transformation. > > > > > >>> > > > > > >>> But it is a little difficult for me to image how the Samza will > > > look > > > > > >> like. > > > > > >>> And I feel Chris and Jay have a little difference in terms of > how > > > > > >>> Samza should look like. > > > > > >>> > > > > > >>> *** Will it look like what Jay's code shows (A client of > Kakfa) ? > > > And > > > > > >>> user's application code calls this client? > > > > > >>> > > > > > >>> 1. If we make Samza be a library of Kafka (like what the code > > > shows), > > > > > >>> how do we implement auto-balance and fault-tolerance? Are they > > > taken > > > > > >>> care by the Kafka broker or other mechanism, such as "Samza > > worker" > > > > > >>> (just make up the name) ? > > > > > >>> > > > > > >>> 2. What about other features, such as auto-scaling, shared > state, > > > > > >>> monitoring? > > > > > >>> > > > > > >>> > > > > > >>> *** If we have Samza standalone, (is this what Chris suggests?) > > > > > >>> > > > > > >>> 1. we still need to ingest data from Kakfa and produce to it. > > Then > > > it > > > > > >>> becomes the same as what Samza looks like now, except it does > not > > > > rely > > > > > >>> on Yarn anymore. > > > > > >>> > > > > > >>> 2. if it is standalone, how can it leverage Kafka's metrics, > > logs, > > > > > >>> etc? Use Kafka code as the dependency? > > > > > >>> > > > > > >>> > > > > > >>> Thanks, > > > > > >>> > > > > > >>> Fang, Yan > > > > > >>> yanfang...@gmail.com > > > > > >>> > > > > > >>>> On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > >>> wrote: > > > > > >>> > > > > > >>>> Read through the code example and it looks good to me. A few > > > > > >>>> thoughts regarding deployment: > > > > > >>>> > > > > > >>>> Today Samza deploys as executable runnable like: > > > > > >>>> > > > > > >>>> deploy/samza/bin/run-job.sh --config-factory=... > > > > > >> --config-path=file://... > > > > > >>>> > > > > > >>>> And this proposal advocate for deploying Samza more as > embedded > > > > > >>>> libraries in user application code (ignoring the terminology > > since > > > > > >>>> it is not the > > > > > >>> same > > > > > >>>> as the prototype code): > > > > > >>>> > > > > > >>>> StreamTask task = new MyStreamTask(configs); Thread thread = > new > > > > > >>>> Thread(task); thread.start(); > > > > > >>>> > > > > > >>>> I think both of these deployment modes are important for > > different > > > > > >>>> types > > > > > >>> of > > > > > >>>> users. That said, I think making Samza purely standalone is > > still > > > > > >>>> sufficient for either runnable or library modes. > > > > > >>>> > > > > > >>>> Guozhang > > > > > >>>> > > > > > >>>>> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps < > j...@confluent.io> > > > > > wrote: > > > > > >>>>> > > > > > >>>>> Looks like gmail mangled the code example, it was supposed to > > > look > > > > > >>>>> like > > > > > >>>>> this: > > > > > >>>>> > > > > > >>>>> Properties props = new Properties(); > > > > > >>>>> props.put("bootstrap.servers", "localhost:4242"); > > StreamingConfig > > > > > >>>>> config = new StreamingConfig(props); > > > > > >>>>> config.subscribe("test-topic-1", "test-topic-2"); > > > > > >>>>> config.processor(ExampleStreamProcessor.class); > > > > > >>>>> config.serialization(new StringSerializer(), new > > > > > >>>>> StringDeserializer()); KafkaStreaming container = new > > > > > >>>>> KafkaStreaming(config); container.run(); > > > > > >>>>> > > > > > >>>>> -Jay > > > > > >>>>> > > > > > >>>>> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps < > j...@confluent.io> > > > > > >> wrote: > > > > > >>>>> > > > > > >>>>>> Hey guys, > > > > > >>>>>> > > > > > >>>>>> This came out of some conversations Chris and I were having > > > > > >>>>>> around > > > > > >>>>> whether > > > > > >>>>>> it would make sense to use Samza as a kind of data ingestion > > > > > >>> framework > > > > > >>>>> for > > > > > >>>>>> Kafka (which ultimately lead to KIP-26 "copycat"). This kind > > of > > > > > >>>> combined > > > > > >>>>>> with complaints around config and YARN and the discussion > > around > > > > > >>>>>> how > > > > > >>> to > > > > > >>>>>> best do a standalone mode. > > > > > >>>>>> > > > > > >>>>>> So the thought experiment was, given that Samza was > basically > > > > > >>>>>> already totally Kafka specific, what if you just embraced > that > > > > > >>>>>> and turned it > > > > > >>>> into > > > > > >>>>>> something less like a heavyweight framework and more like a > > > > > >>>>>> third > > > > > >>> Kafka > > > > > >>>>>> client--a kind of "producing consumer" with state management > > > > > >>>> facilities. > > > > > >>>>>> Basically a library. Instead of a complex stream processing > > > > > >>>>>> framework > > > > > >>>>> this > > > > > >>>>>> would actually be a very simple thing, not much more > > complicated > > > > > >>>>>> to > > > > > >>> use > > > > > >>>>> or > > > > > >>>>>> operate than a Kafka consumer. As Chris said we thought > about > > it > > > > > >>>>>> a > > > > > >>> lot > > > > > >>>> of > > > > > >>>>>> what Samza (and the other stream processing systems were > > doing) > > > > > >>> seemed > > > > > >>>>> like > > > > > >>>>>> kind of a hangover from MapReduce. > > > > > >>>>>> > > > > > >>>>>> Of course you need to ingest/output data to and from the > > stream > > > > > >>>>>> processing. But when we actually looked into how that would > > > > > >>>>>> work, > > > > > >>> Samza > > > > > >>>>>> isn't really an ideal data ingestion framework for a bunch > of > > > > > >>> reasons. > > > > > >>>> To > > > > > >>>>>> really do that right you need a pretty different internal > data > > > > > >>>>>> model > > > > > >>>> and > > > > > >>>>>> set of apis. So what if you split them and had an api for > > Kafka > > > > > >>>>>> ingress/egress (copycat AKA KIP-26) and a separate api for > > Kafka > > > > > >>>>>> transformation (Samza). > > > > > >>>>>> > > > > > >>>>>> This would also allow really embracing the same terminology > > and > > > > > >>>>>> conventions. One complaint about the current state is that > the > > > > > >>>>>> two > > > > > >>>>> systems > > > > > >>>>>> kind of feel bolted on. Terminology like "stream" vs "topic" > > and > > > > > >>>>> different > > > > > >>>>>> config and monitoring systems means you kind of have to > learn > > > > > >>>>>> Kafka's > > > > > >>>>> way, > > > > > >>>>>> then learn Samza's slightly different way, then kind of > > > > > >>>>>> understand > > > > > >>> how > > > > > >>>>> they > > > > > >>>>>> map to each other, which having walked a few people through > > this > > > > > >>>>>> is surprisingly tricky for folks to get. > > > > > >>>>>> > > > > > >>>>>> Since I have been spending a lot of time on airplanes I > hacked > > > > > >>>>>> up an ernest but still somewhat incomplete prototype of what > > > > > >>>>>> this would > > > > > >>> look > > > > > >>>>>> like. This is just unceremoniously dumped into Kafka as it > > > > > >>>>>> required a > > > > > >>>> few > > > > > >>>>>> changes to the new consumer. Here is the code: > > > > > >>> > > > > > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org > > > > > >>> /apache/kafka/clients/streaming > > > > > >>>>>> > > > > > >>>>>> For the purpose of the prototype I just liberally renamed > > > > > >>>>>> everything > > > > > >>> to > > > > > >>>>>> try to align it with Kafka with no regard for compatibility. > > > > > >>>>>> > > > > > >>>>>> To use this would be something like this: > > > > > >>>>>> Properties props = new Properties(); > > > > > >>>>>> props.put("bootstrap.servers", "localhost:4242"); > > > > > >>>>>> StreamingConfig config = new > > > > > >>> StreamingConfig(props); > > > > > >>>>> config.subscribe("test-topic-1", > > > > > >>>>>> "test-topic-2"); > > config.processor(ExampleStreamProcessor.class); > > > > > >>>>> config.serialization(new > > > > > >>>>>> StringSerializer(), new StringDeserializer()); > KafkaStreaming > > > > > >>>> container = > > > > > >>>>>> new KafkaStreaming(config); container.run(); > > > > > >>>>>> > > > > > >>>>>> KafkaStreaming is basically the SamzaContainer; > > StreamProcessor > > > > > >>>>>> is basically StreamTask. > > > > > >>>>>> > > > > > >>>>>> So rather than putting all the class names in a file and > then > > > > > >>>>>> having > > > > > >>>> the > > > > > >>>>>> job assembled by reflection, you just instantiate the > > container > > > > > >>>>>> programmatically. Work is balanced over however many > instances > > > > > >>>>>> of > > > > > >>> this > > > > > >>>>> are > > > > > >>>>>> alive at any time (i.e. if an instance dies, new tasks are > > added > > > > > >>>>>> to > > > > > >>> the > > > > > >>>>>> existing containers without shutting them down). > > > > > >>>>>> > > > > > >>>>>> We would provide some glue for running this stuff in YARN > via > > > > > >>>>>> Slider, Mesos via Marathon, and AWS using some of their > tools > > > > > >>>>>> but from the > > > > > >>>> point > > > > > >>>>> of > > > > > >>>>>> view of these frameworks these stream processing jobs are > just > > > > > >>>> stateless > > > > > >>>>>> services that can come and go and expand and contract at > will. > > > > > >>>>>> There > > > > > >>> is > > > > > >>>>> no > > > > > >>>>>> more custom scheduler. > > > > > >>>>>> > > > > > >>>>>> Here are some relevant details: > > > > > >>>>>> > > > > > >>>>>> 1. It is only ~1300 lines of code, it would get larger if > we > > > > > >>>>>> productionized but not vastly larger. We really do get a > ton > > > > > >>>>>> of > > > > > >>>>> leverage > > > > > >>>>>> out of Kafka. > > > > > >>>>>> 2. Partition management is fully delegated to the new > > > consumer. > > > > > >>> This > > > > > >>>>>> is nice since now any partition management strategy > > available > > > > > >>>>>> to > > > > > >>>> Kafka > > > > > >>>>>> consumer is also available to Samza (and vice versa) and > > with > > > > > >>>>>> the > > > > > >>>>> exact > > > > > >>>>>> same configs. > > > > > >>>>>> 3. It supports state as well as state reuse > > > > > >>>>>> > > > > > >>>>>> Anyhow take a look, hopefully it is thought provoking. > > > > > >>>>>> > > > > > >>>>>> -Jay > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini < > > > > > >>>> criccom...@apache.org> > > > > > >>>>>> wrote: > > > > > >>>>>> > > > > > >>>>>>> Hey all, > > > > > >>>>>>> > > > > > >>>>>>> I have had some discussions with Samza engineers at > LinkedIn > > > > > >>>>>>> and > > > > > >>>>> Confluent > > > > > >>>>>>> and we came up with a few observations and would like to > > > > > >>>>>>> propose > > > > > >>> some > > > > > >>>>>>> changes. > > > > > >>>>>>> > > > > > >>>>>>> We've observed some things that I want to call out about > > > > > >>>>>>> Samza's > > > > > >>>> design, > > > > > >>>>>>> and I'd like to propose some changes. > > > > > >>>>>>> > > > > > >>>>>>> * Samza is dependent upon a dynamic deployment system. > > > > > >>>>>>> * Samza is too pluggable. > > > > > >>>>>>> * Samza's SystemConsumer/SystemProducer and Kafka's > consumer > > > > > >>>>>>> APIs > > > > > >>> are > > > > > >>>>>>> trying to solve a lot of the same problems. > > > > > >>>>>>> > > > > > >>>>>>> All three of these issues are related, but I'll address > them > > in > > > > > >>> order. > > > > > >>>>>>> > > > > > >>>>>>> Deployment > > > > > >>>>>>> > > > > > >>>>>>> Samza strongly depends on the use of a dynamic deployment > > > > > >>>>>>> scheduler > > > > > >>>> such > > > > > >>>>>>> as > > > > > >>>>>>> YARN, Mesos, etc. When we initially built Samza, we bet > that > > > > > >>>>>>> there > > > > > >>>> would > > > > > >>>>>>> be > > > > > >>>>>>> one or two winners in this area, and we could support them, > > and > > > > > >>>>>>> the > > > > > >>>> rest > > > > > >>>>>>> would go away. In reality, there are many variations. > > > > > >>>>>>> Furthermore, > > > > > >>>> many > > > > > >>>>>>> people still prefer to just start their processors like > > normal > > > > > >>>>>>> Java processes, and use traditional deployment scripts such > > as > > > > > >>>>>>> Fabric, > > > > > >>>> Chef, > > > > > >>>>>>> Ansible, etc. Forcing a deployment system on users makes > the > > > > > >>>>>>> Samza start-up process really painful for first time users. > > > > > >>>>>>> > > > > > >>>>>>> Dynamic deployment as a requirement was also a bit of a > > > > > >>>>>>> mis-fire > > > > > >>>> because > > > > > >>>>>>> of > > > > > >>>>>>> a fundamental misunderstanding between the nature of batch > > jobs > > > > > >>>>>>> and > > > > > >>>>> stream > > > > > >>>>>>> processing jobs. Early on, we made conscious effort to > favor > > > > > >>>>>>> the > > > > > >>>> Hadoop > > > > > >>>>>>> (Map/Reduce) way of doing things, since it worked and was > > well > > > > > >>>>> understood. > > > > > >>>>>>> One thing that we missed was that batch jobs have a > definite > > > > > >>>> beginning, > > > > > >>>>>>> and > > > > > >>>>>>> end, and stream processing jobs don't (usually). This leads > > to > > > > > >>>>>>> a > > > > > >>> much > > > > > >>>>>>> simpler scheduling problem for stream processors. You > > basically > > > > > >>>>>>> just > > > > > >>>>> need > > > > > >>>>>>> to find a place to start the processor, and start it. The > way > > > > > >>>>>>> we run grids, at LinkedIn, there's no concept of a cluster > > > > > >>>>>>> being "full". We always > > > > > >>>> add > > > > > >>>>>>> more machines. The problem with coupling Samza with a > > scheduler > > > > > >>>>>>> is > > > > > >>>> that > > > > > >>>>>>> Samza (as a framework) now has to handle deployment. This > > pulls > > > > > >>>>>>> in a > > > > > >>>>> bunch > > > > > >>>>>>> of things such as configuration distribution (config > stream), > > > > > >>>>>>> shell > > > > > >>>>> scrips > > > > > >>>>>>> (bin/run-job.sh, JobRunner), packaging (all the .tgz > stuff), > > > etc. > > > > > >>>>>>> > > > > > >>>>>>> Another reason for requiring dynamic deployment was to > > support > > > > > >>>>>>> data locality. If you want to have locality, you need to > put > > > > > >>>>>>> your > > > > > >>>> processors > > > > > >>>>>>> close to the data they're processing. Upon further > > > > > >>>>>>> investigation, > > > > > >>>>> though, > > > > > >>>>>>> this feature is not that beneficial. There is some good > > > > > >>>>>>> discussion > > > > > >>>> about > > > > > >>>>>>> some problems with it on SAMZA-335. Again, we took the > > > > > >>>>>>> Map/Reduce > > > > > >>>> path, > > > > > >>>>>>> but > > > > > >>>>>>> there are some fundamental differences between HDFS and > > Kafka. > > > > > >>>>>>> HDFS > > > > > >>>> has > > > > > >>>>>>> blocks, while Kafka has partitions. This leads to less > > > > > >>>>>>> optimization potential with stream processors on top of > > Kafka. > > > > > >>>>>>> > > > > > >>>>>>> This feature is also used as a crutch. Samza doesn't have > any > > > > > >>>>>>> built > > > > > >>> in > > > > > >>>>>>> fault-tolerance logic. Instead, it depends on the dynamic > > > > > >>>>>>> deployment scheduling system to handle restarts when a > > > > > >>>>>>> processor dies. This has > > > > > >>>>> made > > > > > >>>>>>> it very difficult to write a standalone Samza container > > > > > >> (SAMZA-516). > > > > > >>>>>>> > > > > > >>>>>>> Pluggability > > > > > >>>>>>> > > > > > >>>>>>> In some cases pluggability is good, but I think that we've > > gone > > > > > >>>>>>> too > > > > > >>>> far > > > > > >>>>>>> with it. Currently, Samza has: > > > > > >>>>>>> > > > > > >>>>>>> * Pluggable config. > > > > > >>>>>>> * Pluggable metrics. > > > > > >>>>>>> * Pluggable deployment systems. > > > > > >>>>>>> * Pluggable streaming systems (SystemConsumer, > > SystemProducer, > > > > > >> etc). > > > > > >>>>>>> * Pluggable serdes. > > > > > >>>>>>> * Pluggable storage engines. > > > > > >>>>>>> * Pluggable strategies for just about every component > > > > > >>> (MessageChooser, > > > > > >>>>>>> SystemStreamPartitionGrouper, ConfigRewriter, etc). > > > > > >>>>>>> > > > > > >>>>>>> There's probably more that I've forgotten, as well. Some of > > > > > >>>>>>> these > > > > > >>> are > > > > > >>>>>>> useful, but some have proven not to be. This all comes at a > > > cost: > > > > > >>>>>>> complexity. This complexity is making it harder for our > users > > > > > >>>>>>> to > > > > > >>> pick > > > > > >>>> up > > > > > >>>>>>> and use Samza out of the box. It also makes it difficult > for > > > > > >>>>>>> Samza developers to reason about what the characteristics > of > > > > > >>>>>>> the container (since the characteristics change depending > on > > > > > >>>>>>> which plugins are use). > > > > > >>>>>>> > > > > > >>>>>>> The issues with pluggability are most visible in the System > > > APIs. > > > > > >>> What > > > > > >>>>>>> Samza really requires to be functional is Kafka as its > > > > > >>>>>>> transport > > > > > >>>> layer. > > > > > >>>>>>> But > > > > > >>>>>>> we've conflated two unrelated use cases into one API: > > > > > >>>>>>> > > > > > >>>>>>> 1. Get data into/out of Kafka. > > > > > >>>>>>> 2. Process the data in Kafka. > > > > > >>>>>>> > > > > > >>>>>>> The current System API supports both of these use cases. > The > > > > > >>>>>>> problem > > > > > >>>> is, > > > > > >>>>>>> we > > > > > >>>>>>> actually want different features for each use case. By > > papering > > > > > >>>>>>> over > > > > > >>>>> these > > > > > >>>>>>> two use cases, and providing a single API, we've > introduced a > > > > > >>>>>>> ton of > > > > > >>>>> leaky > > > > > >>>>>>> abstractions. > > > > > >>>>>>> > > > > > >>>>>>> For example, what we'd really like in (2) is to have > > > > > >>>>>>> monotonically increasing longs for offsets (like Kafka). > This > > > > > >>>>>>> would be at odds > > > > > >>> with > > > > > >>>>> (1), > > > > > >>>>>>> though, since different systems have different > > > > > >>>>> SCNs/Offsets/UUIDs/vectors. > > > > > >>>>>>> There was discussion both on the mailing list and the SQL > > JIRAs > > > > > >>> about > > > > > >>>>> the > > > > > >>>>>>> need for this. > > > > > >>>>>>> > > > > > >>>>>>> The same thing holds true for replayability. Kafka allows > us > > to > > > > > >>> rewind > > > > > >>>>>>> when > > > > > >>>>>>> we have a failure. Many other systems don't. In some cases, > > > > > >>>>>>> systems > > > > > >>>>> return > > > > > >>>>>>> null for their offsets (e.g. WikipediaSystemConsumer) > because > > > > > >>>>>>> they > > > > > >>>> have > > > > > >>>>> no > > > > > >>>>>>> offsets. > > > > > >>>>>>> > > > > > >>>>>>> Partitioning is another example. Kafka supports > partitioning, > > > > > >>>>>>> but > > > > > >>> many > > > > > >>>>>>> systems don't. We model this by having a single partition > for > > > > > >>>>>>> those systems. Still, other systems model partitioning > > > > > >> differently (e.g. > > > > > >>>>>>> Kinesis). > > > > > >>>>>>> > > > > > >>>>>>> The SystemAdmin interface is also a mess. Creating streams > > in a > > > > > >>>>>>> system-agnostic way is almost impossible. As is modeling > > > > > >>>>>>> metadata > > > > > >>> for > > > > > >>>>> the > > > > > >>>>>>> system (replication factor, partitions, location, etc). The > > > > > >>>>>>> list > > > > > >>> goes > > > > > >>>>> on. > > > > > >>>>>>> > > > > > >>>>>>> Duplicate work > > > > > >>>>>>> > > > > > >>>>>>> At the time that we began writing Samza, Kafka's consumer > and > > > > > >>> producer > > > > > >>>>>>> APIs > > > > > >>>>>>> had a relatively weak feature set. On the consumer-side, > you > > > > > >>>>>>> had two > > > > > >>>>>>> options: use the high level consumer, or the simple > consumer. > > > > > >>>>>>> The > > > > > >>>>> problem > > > > > >>>>>>> with the high-level consumer was that it controlled your > > > > > >>>>>>> offsets, partition assignments, and the order in which you > > > > > >>>>>>> received messages. The > > > > > >>> problem > > > > > >>>>>>> with > > > > > >>>>>>> the simple consumer is that it's not simple. It's basic. > You > > > > > >>>>>>> end up > > > > > >>>>> having > > > > > >>>>>>> to handle a lot of really low-level stuff that you > shouldn't. > > > > > >>>>>>> We > > > > > >>>> spent a > > > > > >>>>>>> lot of time to make Samza's KafkaSystemConsumer very > robust. > > It > > > > > >>>>>>> also allows us to support some cool features: > > > > > >>>>>>> > > > > > >>>>>>> * Per-partition message ordering and prioritization. > > > > > >>>>>>> * Tight control over partition assignment to support joins, > > > > > >>>>>>> global > > > > > >>>> state > > > > > >>>>>>> (if we want to implement it :)), etc. > > > > > >>>>>>> * Tight control over offset checkpointing. > > > > > >>>>>>> > > > > > >>>>>>> What we didn't realize at the time is that these features > > > > > >>>>>>> should > > > > > >>>>> actually > > > > > >>>>>>> be in Kafka. A lot of Kafka consumers (not just Samza > stream > > > > > >>>> processors) > > > > > >>>>>>> end up wanting to do things like joins and partition > > > > > >>>>>>> assignment. The > > > > > >>>>> Kafka > > > > > >>>>>>> community has come to the same conclusion. They're adding a > > ton > > > > > >>>>>>> of upgrades into their new Kafka consumer implementation. > To > > a > > > > > >>>>>>> large extent, > > > > > >>> it's > > > > > >>>>>>> duplicate work to what we've already done in Samza. > > > > > >>>>>>> > > > > > >>>>>>> On top of this, Kafka ended up taking a very similar > approach > > > > > >>>>>>> to > > > > > >>>> Samza's > > > > > >>>>>>> KafkaCheckpointManager implementation for handling offset > > > > > >>>> checkpointing. > > > > > >>>>>>> Like Samza, Kafka's new offset management feature stores > > offset > > > > > >>>>>>> checkpoints in a topic, and allows you to fetch them from > the > > > > > >>>>>>> broker. > > > > > >>>>>>> > > > > > >>>>>>> A lot of this seems like a waste, since we could have > shared > > > > > >>>>>>> the > > > > > >>> work > > > > > >>>> if > > > > > >>>>>>> it > > > > > >>>>>>> had been done in Kafka from the get-go. > > > > > >>>>>>> > > > > > >>>>>>> Vision > > > > > >>>>>>> > > > > > >>>>>>> All of this leads me to a rather radical proposal. Samza is > > > > > >>> relatively > > > > > >>>>>>> stable at this point. I'd venture to say that we're near a > > 1.0 > > > > > >>>> release. > > > > > >>>>>>> I'd > > > > > >>>>>>> like to propose that we take what we've learned, and begin > > > > > >>>>>>> thinking > > > > > >>>>> about > > > > > >>>>>>> Samza beyond 1.0. What would we change if we were starting > > from > > > > > >>>> scratch? > > > > > >>>>>>> My > > > > > >>>>>>> proposal is to: > > > > > >>>>>>> > > > > > >>>>>>> 1. Make Samza standalone the *only* way to run Samza > > > > > >>>>>>> processors, and eliminate all direct dependences on YARN, > > > Mesos, > > > > > >> etc. > > > > > >>>>>>> 2. Make a definitive call to support only Kafka as the > stream > > > > > >>>> processing > > > > > >>>>>>> layer. > > > > > >>>>>>> 3. Eliminate Samza's metrics, logging, serialization, and > > > > > >>>>>>> config > > > > > >>>>> systems, > > > > > >>>>>>> and simply use Kafka's instead. > > > > > >>>>>>> > > > > > >>>>>>> This would fix all of the issues that I outlined above. It > > > > > >>>>>>> should > > > > > >>> also > > > > > >>>>>>> shrink the Samza code base pretty dramatically. Supporting > > only > > > > > >>>>>>> a standalone container will allow Samza to be executed on > > YARN > > > > > >>>>>>> (using Slider), Mesos (using Marathon/Aurora), or most > other > > > > > >>>>>>> in-house > > > > > >>>>> deployment > > > > > >>>>>>> systems. This should make life a lot easier for new users. > > > > > >>>>>>> Imagine > > > > > >>>>> having > > > > > >>>>>>> the hello-samza tutorial without YARN. The drop in mailing > > list > > > > > >>>> traffic > > > > > >>>>>>> will be pretty dramatic. > > > > > >>>>>>> > > > > > >>>>>>> Coupling with Kafka seems long overdue to me. The reality > is, > > > > > >>> everyone > > > > > >>>>>>> that > > > > > >>>>>>> I'm aware of is using Samza with Kafka. We basically > require > > it > > > > > >>>> already > > > > > >>>>> in > > > > > >>>>>>> order for most features to work. Those that are using other > > > > > >>>>>>> systems > > > > > >>>> are > > > > > >>>>>>> generally using it for ingest into Kafka (1), and then they > > do > > > > > >>>>>>> the processing on top. There is already discussion ( > > > > > >>> > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851 > > > > > >>> 767 > > > > > >>>>>>> ) > > > > > >>>>>>> in Kafka to make ingesting into Kafka extremely easy. > > > > > >>>>>>> > > > > > >>>>>>> Once we make the call to couple with Kafka, we can > leverage a > > > > > >>>>>>> ton of > > > > > >>>>> their > > > > > >>>>>>> ecosystem. We no longer have to maintain our own config, > > > > > >>>>>>> metrics, > > > > > >>> etc. > > > > > >>>>> We > > > > > >>>>>>> can all share the same libraries, and make them better. > This > > > > > >>>>>>> will > > > > > >>> also > > > > > >>>>>>> allow us to share the consumer/producer APIs, and will let > us > > > > > >>> leverage > > > > > >>>>>>> their offset management and partition management, rather > than > > > > > >>>>>>> having > > > > > >>>> our > > > > > >>>>>>> own. All of the coordinator stream code would go away, as > > would > > > > > >>>>>>> most > > > > > >>>> of > > > > > >>>>>>> the > > > > > >>>>>>> YARN AppMaster code. We'd probably have to push some > > partition > > > > > >>>>> management > > > > > >>>>>>> features into the Kafka broker, but they're already moving > in > > > > > >>>>>>> that direction with the new consumer API. The features we > > have > > > > > >>>>>>> for > > > > > >>>> partition > > > > > >>>>>>> assignment aren't unique to Samza, and seem like they > should > > be > > > > > >>>>>>> in > > > > > >>>> Kafka > > > > > >>>>>>> anyway. There will always be some niche usages which will > > > > > >>>>>>> require > > > > > >>>> extra > > > > > >>>>>>> care and hence full control over partition assignments much > > > > > >>>>>>> like the > > > > > >>>>> Kafka > > > > > >>>>>>> low level consumer api. These would continue to be > supported. > > > > > >>>>>>> > > > > > >>>>>>> These items will be good for the Samza community. They'll > > make > > > > > >>>>>>> Samza easier to use, and make it easier for developers to > add > > > > > >>>>>>> new features. > > > > > >>>>>>> > > > > > >>>>>>> Obviously this is a fairly large (and somewhat backwards > > > > > >>> incompatible > > > > > >>>>>>> change). If we choose to go this route, it's important that > > we > > > > > >>> openly > > > > > >>>>>>> communicate how we're going to provide a migration path > from > > > > > >>>>>>> the > > > > > >>>>> existing > > > > > >>>>>>> APIs to the new ones (if we make incompatible changes). I > > think > > > > > >>>>>>> at a minimum, we'd probably need to provide a wrapper to > > allow > > > > > >>>>>>> existing StreamTask implementations to continue running on > > the > > > > > >> new container. > > > > > >>>>> It's > > > > > >>>>>>> also important that we openly communicate about timing, and > > > > > >>>>>>> stages > > > > > >>> of > > > > > >>>>> the > > > > > >>>>>>> migration. > > > > > >>>>>>> > > > > > >>>>>>> If you made it this far, I'm sure you have opinions. :) > > Please > > > > > >>>>>>> send > > > > > >>>> your > > > > > >>>>>>> thoughts and feedback. > > > > > >>>>>>> > > > > > >>>>>>> Cheers, > > > > > >>>>>>> Chris > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> -- > > > > > >>>> -- Guozhang > > > > > >> > > > > > > > > > > > > > > > > > > -- > -- Guozhang >