Read through the code example and it looks good to me. A few thoughts
regarding deployment:

Today Samza deploys as executable runnable like:

deploy/samza/bin/run-job.sh --config-factory=... --config-path=file://...

And this proposal advocate for deploying Samza more as embedded libraries
in user application code (ignoring the terminology since it is not the same
as the prototype code):

StreamTask task = new MyStreamTask(configs);
Thread thread = new Thread(task);
thread.start();

I think both of these deployment modes are important for different types of
users. That said, I think making Samza purely standalone is still
sufficient for either runnable or library modes.

Guozhang

On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io> wrote:

> Looks like gmail mangled the code example, it was supposed to look like
> this:
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:4242");
> StreamingConfig config = new StreamingConfig(props);
> config.subscribe("test-topic-1", "test-topic-2");
> config.processor(ExampleStreamProcessor.class);
> config.serialization(new StringSerializer(), new StringDeserializer());
> KafkaStreaming container = new KafkaStreaming(config);
> container.run();
>
> -Jay
>
> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey guys,
> >
> > This came out of some conversations Chris and I were having around
> whether
> > it would make sense to use Samza as a kind of data ingestion framework
> for
> > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of combined
> > with complaints around config and YARN and the discussion around how to
> > best do a standalone mode.
> >
> > So the thought experiment was, given that Samza was basically already
> > totally Kafka specific, what if you just embraced that and turned it into
> > something less like a heavyweight framework and more like a third Kafka
> > client--a kind of "producing consumer" with state management facilities.
> > Basically a library. Instead of a complex stream processing framework
> this
> > would actually be a very simple thing, not much more complicated to use
> or
> > operate than a Kafka consumer. As Chris said we thought about it a lot of
> > what Samza (and the other stream processing systems were doing) seemed
> like
> > kind of a hangover from MapReduce.
> >
> > Of course you need to ingest/output data to and from the stream
> > processing. But when we actually looked into how that would work, Samza
> > isn't really an ideal data ingestion framework for a bunch of reasons. To
> > really do that right you need a pretty different internal data model and
> > set of apis. So what if you split them and had an api for Kafka
> > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> > transformation (Samza).
> >
> > This would also allow really embracing the same terminology and
> > conventions. One complaint about the current state is that the two
> systems
> > kind of feel bolted on. Terminology like "stream" vs "topic" and
> different
> > config and monitoring systems means you kind of have to learn Kafka's
> way,
> > then learn Samza's slightly different way, then kind of understand how
> they
> > map to each other, which having walked a few people through this is
> > surprisingly tricky for folks to get.
> >
> > Since I have been spending a lot of time on airplanes I hacked up an
> > ernest but still somewhat incomplete prototype of what this would look
> > like. This is just unceremoniously dumped into Kafka as it required a few
> > changes to the new consumer. Here is the code:
> >
> >
> https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming
> >
> > For the purpose of the prototype I just liberally renamed everything to
> > try to align it with Kafka with no regard for compatibility.
> >
> > To use this would be something like this:
> > Properties props = new Properties(); props.put("bootstrap.servers",
> > "localhost:4242"); StreamingConfig config = new StreamingConfig(props);
> config.subscribe("test-topic-1",
> > "test-topic-2"); config.processor(ExampleStreamProcessor.class);
> config.serialization(new
> > StringSerializer(), new StringDeserializer()); KafkaStreaming container =
> > new KafkaStreaming(config); container.run();
> >
> > KafkaStreaming is basically the SamzaContainer; StreamProcessor is
> > basically StreamTask.
> >
> > So rather than putting all the class names in a file and then having the
> > job assembled by reflection, you just instantiate the container
> > programmatically. Work is balanced over however many instances of this
> are
> > alive at any time (i.e. if an instance dies, new tasks are added to the
> > existing containers without shutting them down).
> >
> > We would provide some glue for running this stuff in YARN via Slider,
> > Mesos via Marathon, and AWS using some of their tools but from the point
> of
> > view of these frameworks these stream processing jobs are just stateless
> > services that can come and go and expand and contract at will. There is
> no
> > more custom scheduler.
> >
> > Here are some relevant details:
> >
> >    1. It is only ~1300 lines of code, it would get larger if we
> >    productionized but not vastly larger. We really do get a ton of
> leverage
> >    out of Kafka.
> >    2. Partition management is fully delegated to the new consumer. This
> >    is nice since now any partition management strategy available to Kafka
> >    consumer is also available to Samza (and vice versa) and with the
> exact
> >    same configs.
> >    3. It supports state as well as state reuse
> >
> > Anyhow take a look, hopefully it is thought provoking.
> >
> > -Jay
> >
> >
> >
> > On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <criccom...@apache.org>
> > wrote:
> >
> >> Hey all,
> >>
> >> I have had some discussions with Samza engineers at LinkedIn and
> Confluent
> >> and we came up with a few observations and would like to propose some
> >> changes.
> >>
> >> We've observed some things that I want to call out about Samza's design,
> >> and I'd like to propose some changes.
> >>
> >> * Samza is dependent upon a dynamic deployment system.
> >> * Samza is too pluggable.
> >> * Samza's SystemConsumer/SystemProducer and Kafka's consumer APIs are
> >> trying to solve a lot of the same problems.
> >>
> >> All three of these issues are related, but I'll address them in order.
> >>
> >> Deployment
> >>
> >> Samza strongly depends on the use of a dynamic deployment scheduler such
> >> as
> >> YARN, Mesos, etc. When we initially built Samza, we bet that there would
> >> be
> >> one or two winners in this area, and we could support them, and the rest
> >> would go away. In reality, there are many variations. Furthermore, many
> >> people still prefer to just start their processors like normal Java
> >> processes, and use traditional deployment scripts such as Fabric, Chef,
> >> Ansible, etc. Forcing a deployment system on users makes the Samza
> >> start-up
> >> process really painful for first time users.
> >>
> >> Dynamic deployment as a requirement was also a bit of a mis-fire because
> >> of
> >> a fundamental misunderstanding between the nature of batch jobs and
> stream
> >> processing jobs. Early on, we made conscious effort to favor the Hadoop
> >> (Map/Reduce) way of doing things, since it worked and was well
> understood.
> >> One thing that we missed was that batch jobs have a definite beginning,
> >> and
> >> end, and stream processing jobs don't (usually). This leads to a much
> >> simpler scheduling problem for stream processors. You basically just
> need
> >> to find a place to start the processor, and start it. The way we run
> >> grids,
> >> at LinkedIn, there's no concept of a cluster being "full". We always add
> >> more machines. The problem with coupling Samza with a scheduler is that
> >> Samza (as a framework) now has to handle deployment. This pulls in a
> bunch
> >> of things such as configuration distribution (config stream), shell
> scrips
> >> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc.
> >>
> >> Another reason for requiring dynamic deployment was to support data
> >> locality. If you want to have locality, you need to put your processors
> >> close to the data they're processing. Upon further investigation,
> though,
> >> this feature is not that beneficial. There is some good discussion about
> >> some problems with it on SAMZA-335. Again, we took the Map/Reduce path,
> >> but
> >> there are some fundamental differences between HDFS and Kafka. HDFS has
> >> blocks, while Kafka has partitions. This leads to less optimization
> >> potential with stream processors on top of Kafka.
> >>
> >> This feature is also used as a crutch. Samza doesn't have any built in
> >> fault-tolerance logic. Instead, it depends on the dynamic deployment
> >> scheduling system to handle restarts when a processor dies. This has
> made
> >> it very difficult to write a standalone Samza container (SAMZA-516).
> >>
> >> Pluggability
> >>
> >> In some cases pluggability is good, but I think that we've gone too far
> >> with it. Currently, Samza has:
> >>
> >> * Pluggable config.
> >> * Pluggable metrics.
> >> * Pluggable deployment systems.
> >> * Pluggable streaming systems (SystemConsumer, SystemProducer, etc).
> >> * Pluggable serdes.
> >> * Pluggable storage engines.
> >> * Pluggable strategies for just about every component (MessageChooser,
> >> SystemStreamPartitionGrouper, ConfigRewriter, etc).
> >>
> >> There's probably more that I've forgotten, as well. Some of these are
> >> useful, but some have proven not to be. This all comes at a cost:
> >> complexity. This complexity is making it harder for our users to pick up
> >> and use Samza out of the box. It also makes it difficult for Samza
> >> developers to reason about what the characteristics of the container
> >> (since
> >> the characteristics change depending on which plugins are use).
> >>
> >> The issues with pluggability are most visible in the System APIs. What
> >> Samza really requires to be functional is Kafka as its transport layer.
> >> But
> >> we've conflated two unrelated use cases into one API:
> >>
> >> 1. Get data into/out of Kafka.
> >> 2. Process the data in Kafka.
> >>
> >> The current System API supports both of these use cases. The problem is,
> >> we
> >> actually want different features for each use case. By papering over
> these
> >> two use cases, and providing a single API, we've introduced a ton of
> leaky
> >> abstractions.
> >>
> >> For example, what we'd really like in (2) is to have monotonically
> >> increasing longs for offsets (like Kafka). This would be at odds with
> (1),
> >> though, since different systems have different
> SCNs/Offsets/UUIDs/vectors.
> >> There was discussion both on the mailing list and the SQL JIRAs about
> the
> >> need for this.
> >>
> >> The same thing holds true for replayability. Kafka allows us to rewind
> >> when
> >> we have a failure. Many other systems don't. In some cases, systems
> return
> >> null for their offsets (e.g. WikipediaSystemConsumer) because they have
> no
> >> offsets.
> >>
> >> Partitioning is another example. Kafka supports partitioning, but many
> >> systems don't. We model this by having a single partition for those
> >> systems. Still, other systems model partitioning differently (e.g.
> >> Kinesis).
> >>
> >> The SystemAdmin interface is also a mess. Creating streams in a
> >> system-agnostic way is almost impossible. As is modeling metadata for
> the
> >> system (replication factor, partitions, location, etc). The list goes
> on.
> >>
> >> Duplicate work
> >>
> >> At the time that we began writing Samza, Kafka's consumer and producer
> >> APIs
> >> had a relatively weak feature set. On the consumer-side, you had two
> >> options: use the high level consumer, or the simple consumer. The
> problem
> >> with the high-level consumer was that it controlled your offsets,
> >> partition
> >> assignments, and the order in which you received messages. The problem
> >> with
> >> the simple consumer is that it's not simple. It's basic. You end up
> having
> >> to handle a lot of really low-level stuff that you shouldn't. We spent a
> >> lot of time to make Samza's KafkaSystemConsumer very robust. It also
> >> allows
> >> us to support some cool features:
> >>
> >> * Per-partition message ordering and prioritization.
> >> * Tight control over partition assignment to support joins, global state
> >> (if we want to implement it :)), etc.
> >> * Tight control over offset checkpointing.
> >>
> >> What we didn't realize at the time is that these features should
> actually
> >> be in Kafka. A lot of Kafka consumers (not just Samza stream processors)
> >> end up wanting to do things like joins and partition assignment. The
> Kafka
> >> community has come to the same conclusion. They're adding a ton of
> >> upgrades
> >> into their new Kafka consumer implementation. To a large extent, it's
> >> duplicate work to what we've already done in Samza.
> >>
> >> On top of this, Kafka ended up taking a very similar approach to Samza's
> >> KafkaCheckpointManager implementation for handling offset checkpointing.
> >> Like Samza, Kafka's new offset management feature stores offset
> >> checkpoints
> >> in a topic, and allows you to fetch them from the broker.
> >>
> >> A lot of this seems like a waste, since we could have shared the work if
> >> it
> >> had been done in Kafka from the get-go.
> >>
> >> Vision
> >>
> >> All of this leads me to a rather radical proposal. Samza is relatively
> >> stable at this point. I'd venture to say that we're near a 1.0 release.
> >> I'd
> >> like to propose that we take what we've learned, and begin thinking
> about
> >> Samza beyond 1.0. What would we change if we were starting from scratch?
> >> My
> >> proposal is to:
> >>
> >> 1. Make Samza standalone the *only* way to run Samza processors, and
> >> eliminate all direct dependences on YARN, Mesos, etc.
> >> 2. Make a definitive call to support only Kafka as the stream processing
> >> layer.
> >> 3. Eliminate Samza's metrics, logging, serialization, and config
> systems,
> >> and simply use Kafka's instead.
> >>
> >> This would fix all of the issues that I outlined above. It should also
> >> shrink the Samza code base pretty dramatically. Supporting only a
> >> standalone container will allow Samza to be executed on YARN (using
> >> Slider), Mesos (using Marathon/Aurora), or most other in-house
> deployment
> >> systems. This should make life a lot easier for new users. Imagine
> having
> >> the hello-samza tutorial without YARN. The drop in mailing list traffic
> >> will be pretty dramatic.
> >>
> >> Coupling with Kafka seems long overdue to me. The reality is, everyone
> >> that
> >> I'm aware of is using Samza with Kafka. We basically require it already
> in
> >> order for most features to work. Those that are using other systems are
> >> generally using it for ingest into Kafka (1), and then they do the
> >> processing on top. There is already discussion (
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767
> >> )
> >> in Kafka to make ingesting into Kafka extremely easy.
> >>
> >> Once we make the call to couple with Kafka, we can leverage a ton of
> their
> >> ecosystem. We no longer have to maintain our own config, metrics, etc.
> We
> >> can all share the same libraries, and make them better. This will also
> >> allow us to share the consumer/producer APIs, and will let us leverage
> >> their offset management and partition management, rather than having our
> >> own. All of the coordinator stream code would go away, as would most of
> >> the
> >> YARN AppMaster code. We'd probably have to push some partition
> management
> >> features into the Kafka broker, but they're already moving in that
> >> direction with the new consumer API. The features we have for partition
> >> assignment aren't unique to Samza, and seem like they should be in Kafka
> >> anyway. There will always be some niche usages which will require extra
> >> care and hence full control over partition assignments much like the
> Kafka
> >> low level consumer api. These would continue to be supported.
> >>
> >> These items will be good for the Samza community. They'll make Samza
> >> easier
> >> to use, and make it easier for developers to add new features.
> >>
> >> Obviously this is a fairly large (and somewhat backwards incompatible
> >> change). If we choose to go this route, it's important that we openly
> >> communicate how we're going to provide a migration path from the
> existing
> >> APIs to the new ones (if we make incompatible changes). I think at a
> >> minimum, we'd probably need to provide a wrapper to allow existing
> >> StreamTask implementations to continue running on the new container.
> It's
> >> also important that we openly communicate about timing, and stages of
> the
> >> migration.
> >>
> >> If you made it this far, I'm sure you have opinions. :) Please send your
> >> thoughts and feedback.
> >>
> >> Cheers,
> >> Chris
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to