>>  Thanks for the info on the tradeoffs. That makes a lot of sense. I am
on-board with using ZkJobCoordinator, sounds like some good benefits over
just the Kafka high-level consumer.

This certainly looks like the simplest alternative.

For your other questions, please find my answers inline.

>> Q1: If I use LocalApplicationRunner, It does not use "ProcessJobFactory"
(or any StreamJob or *Job classes) correct?

Your understanding is correct. It directly instantiates the
*StreamProcessor*, which in-turn creates and runs the *SamzaContainer*.

>> Q2: If I use LocalApplicationRunner, I will need to code myself the
loading and rewriting of the Config that is currently handled by JobRunner,
correct?

I don't think you'll need to do this. IIUC, the *LocalApplicationRunner*
should automatically invoke rewriters and do the right thing.

>>  Q3: Do I need to also handle coordinator stream(s) and storing of
config that is done in JobRunner (I don’t think so as the ?

I don't think this is necessary either. The creation of coordinator stream
and persisting configuration happens in the *LocalApplicationRunner* (more
specifically in *StreamManager#createStreams*).

>> Q4: Where/How do I specify the Container ID for each instance? Is there
a config setting that I can pass, (or pull from an env variable and add to
the config) ? I am assuming it is my responsibility to ensure that each
instance is started with a unique container ID..?

Nope, If you are using the *ZkJobCoordinator*, you need not have to worry
about assigning IDs for each instance. The framework will automatically
take care of generating IDs and reaching consensus by electing a leader. If
you are curious please take a look at implementations of the
*ProcessorIdGenerator* interface.

Please let us know should you have further questions!

Best,
Jagdish

On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges <tstump...@ntent.com>
wrote:

> Thanks for the info on the tradeoffs. That makes a lot of sense. I am
> on-board with using ZkJobCoordinator, sounds like some good benefits over
> just the Kafka high-level consumer.
>
>
>
> To that end, I have made some notes on possible approaches based on the
> previous thread, and from my look into the code. I’d love to get feedback.
>
>
>
> Approach 1. Configure jobs to use “ProcessJobFactory” and run instances of
> the job using run-job.sh or using JobRunner directly.
>
> I don’t think this makes sense from what I can see for a few reasons:
>
>    - JobRunner is concerned with stuff I don't *think* we need:
>       - coordinatorSystemProducer|Consumer,
>       - writing/reading the configuration to the coordinator streams
>    - ProcessJobFactory hard-codes the ID to “0” so I don’t think that
>    will work for multiple instances.
>
>
>
> Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and invoke
> LocalApplicationRunner.runTask()
>
>
>
>     Q1: If I use LocalApplicationRunner, It does not use
> "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>
>     Q2: If I use LocalApplicationRunner, I will need to code myself the
> loading and rewriting of the Config that is currently handled by JobRunner,
> correct?
>
>     Q3: Do I need to also handle coordinator stream(s) and storing of
> config that is done in JobRunner (I don’t think so as the ?
>
>     Q4: Where/How do I specify the Container ID for each instance? Is
> there a config setting that I can pass, (or pull from an env variable and
> add to the config) ? I am assuming it is my responsibility to ensure that
> each instance is started with a unique container ID..?
>
>
>
> I am getting started on the above (Approach 2.), and looking closer at the
> code so I may have my own answers to my questions, but figured I should go
> ahead and ask now anyway. Thanks!
>
>
>
> -Thunder
>
>
>
>
>
> *From:* Jagadish Venkatraman [mailto:jagadish1...@gmail.com]
> *Sent:* Thursday, March 15, 2018 1:41
> *To:* dev@samza.apache.org; Thunder Stumpges <tstump...@ntent.com>;
> t...@recursivedream.com
> *Cc:* yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
>
> *Subject:* Re: Old style "low level" Tasks with alternative deployment
> model(s)
>
>
>
> >> You are correct that this is focused on the higher-level API but
> doesn't
> preclude using the lower-level API. I was at the same point you were not
> long ago, in fact, and had a very productive conversation on the list
>
>
>
> Thanks Tom for linking the thread, and I'm glad that you were able to get
>
> Kubernetes integration working with Samza.
>
>
>
> >> If it is helpful for everyone, once I get the low-level API +
> ZkJobCoordinator + Docker +
>
> K8s working, I'd be glad to formulate an additional sample for hello-samza.
>
>
>
> @Thunder Stumpges:
>
> We'd be thrilled to receive your contribution. Examples, demos, tutorials
> etc.
>
> contribute a great deal to improving the ease of use of Apache Samza. I'm
> happy
>
> to shepherd design discussions/code-reviews in the open-source including
> answering
>
> any questions you may have.
>
>
>
>
>
> >> One thing I'm still curious about, is what are the drawbacks or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this? We do have
> Zookeeper (because of kafka) so I think either would work. The Kafka
> High-level consumer comes with other nice tools for monitoring offsets,
> lag, etc
>
>
>
>
>
> @Thunder Stumpges:
>
>
>
> Samza uses a "Job-Coordinator" to assign your input-partitions among the
>
> different instances of your application s.t. they don't overlap. A typical
> way to solve this "partition distribution"
>
> problem is to have a single instance elected as a "leader" and have the
> leader assign partitions to the group.
>
> The ZkJobCoordinator uses Zk primitives to achieve this, while the YarnJC
> relies on Yarn's guarantee that there will be a
>
> singleton-AppMaster to achieve this.
>
>
>
> A key difference that separates the PassthroughJC from the Yarn/Zk
> variants is that it
>
> does _not_ attempt to solve the "partition distribution" problem. As a
> result, there's no
>
> leader-election involved. Instead, it pushes the problem of "partition
> distribution" to the underlying
>
> consumer.
>
>
>
> The PassThroughJc supports these 2 scenarios:
>
>
>
> *1. Consumer-managed partition distribution:* When using the Kafka
> high-level consumer
>
> (or an AWS KinesisClientLibrary consumer) with Samza, the consumer manages
> partitions internally.
>
>
>
> *2. Static partition distribution:* Alternately, partitions can be
> managed statically using
>
> configuration. You can achieve static partition assignment by implementing
> a custom
>
> *SystemStreamPartitionGrouper
> <https://samza.apache.org/learn/documentation/0.8/api/javadocs/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.html>*
>  and *TaskNameGrouper
> <https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>*.
> Solutions in this category will typically
>
> require you to distinguish the various processors in the group by
> providing an "id" for each.
>
> Once the "id"s are decided, you can then statically compute assignments
> using a function (eg: modulo N).
>
> You can rely on the following mechanisms to provide this id:
>
>  - Configure each instance differently to have its own id
>
>  - Obtain the id from the cluster-manager. For instance, Kubernetes will
> provide each POD an
>
> unique id in the range [0,N). AWS ECS should expose similar capabilities
> via a REST end-point.
>
>
>
> >> One thing I'm still curious about, is what are the drawbacks or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this?
>
>
>
> *Leveraging the Kafka High-level consumer:*
>
>
>
> The Kafka high-level consumer is not integrated into Samza just yet.
> Instead, Samza's integration with Kafka
>
> uses the low-level consumer because
>
> i) It allows for greater control in fetching data from individual
> brokers. It is simple and
>
> performant in-terms of the threading model to have one-thread pull from
> each broker.
>
> ii) It is efficient in memory utilization since it does not do
> internal-buffering of messages.
>
> iii) There's no overhead like Kafka-controller heart-beats that are driven
> by *consumer.poll*
>
>
>
> Since there's no built-in integration, you will have to build a new
> *SystemConsumer*
>
> if you need to integrate with the Kafka High-level consumer. Further,
> there's more a fair
>
> bit of complexity to manage in checkpointing.
>
>
>
> >> The Kafka High-level consumer comes with other nice tools for
> monitoring offsets, lag, etc
>
>
>
> Samza exposes
> <https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala>
>  the
> below metrics for lag-monitoring:
>
> - The current log-end offset for each partition
>
> - The last check-pointed offset for each partition
>
> - The number of messages behind the highwatermark of the partition
>
>
>
> Please let us know if you need help discovering these or integrating these
> with other systems/tools.
>
>
>
>
>
> *Leveraging the Passthrough JobCoordinator:*
>
>
>
> It's helpful to split this discussion on tradeoffs with PassthroughJC into
> 2 parts:
>
>
>
> 1. PassthroughJC + consumer managed partitions:
>
>
>
> - In this model, Samza has no control over partition-assignment since it's
> managed by the consumer. This means that stateful
>
> operations like joins that rely on partitions being co-located on the same
> task will not work.
>
> Simple stateless operations (eg: map, filter, remote lookups) are fine.
>
>
>
> - A key differentiator between Samza and other frameworks is our support
> for "host affinity
> <https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html>".
> Samza achieves this
>
> by assigning partitions to hosts taking data-locality into account. If the
> consumer can arbitrarily shuffle partitions, it'd be hard to
>
> support this affinity/locality. Often this is a key optimization when
> dealing with large stateful jobs.
>
>
>
> 2. PassthroughJC + static partitions:
>
>
>
> - In this model, it is possible to make stateful processing (including
> host affinity) work by carefully choosing how "id"s are
>
> assigned and computed.
>
>
>
> *Recommendation:*
>
>
>
> - Owing to the above subtleties, I would recommend that we give the
> ZkJobCoordinator + the built-in low-level Kafka integration a try.
>
> - If we hit snags down this path, we can certainly explore the approach
> with PassthroughJC + static partitions.
>
> - Using the PassthroughJC + consumer-managed distribution would be least
> preferable owing to the subtleties I outlined above.
>
>
>
> Please let us know should you have more questions.
>
>
>
> Best,
>
> Jagdish
>
>
>
> On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <tstump...@ntent.com>
> wrote:
>
> Wow, what great timing, and what a great thread! I definitely have some
> good starters to go off of here.
>
> If it is helpful for everyone, once I get the low-level API +
> ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an
> additional sample for hello-samza.
>
> One thing I'm still curious about, is what are the drawbacks or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this? We do have
> Zookeeper (because of kafka) so I think either would work. The Kafka
> High-level consumer comes with other nice tools for monitoring offsets,
> lag, etc....
>
> Thanks guys!
> -Thunder
>
>
> -----Original Message-----
> From: Tom Davis [mailto:t...@recursivedream.com]
> Sent: Wednesday, March 14, 2018 17:50
> To: dev@samza.apache.org
> Subject: Re: Old style "low level" Tasks with alternative deployment
> model(s)
>
> Hey there!
>
> You are correct that this is focused on the higher-level API but doesn't
> preclude using the lower-level API. I was at the same point you were not
> long ago, in fact, and had a very productive conversation on the list:
> you should look for "Question about custom StreamJob/Factory" in the list
> archive for the past couple months.
>
> I'll quote Jagadish Venkatraman from that thread:
>
> > For the section on the low-level API, can you use
> > LocalApplicationRunner#runTask()? It basically creates a new
> > StreamProcessor and runs it. Remember to provide task.class and set it
> > to your implementation of StreamTask or AsyncStreamTask. Please note
> > that this is an evolving API and hence, subject to change.
>
> I ended up just switching to the high-level API because I don't have any
> existing Tasks and the Kubernetes story is a little more straight forward
> there (there's only one container/configuration to deploy).
>
> Best,
>
> Tom
>
> Thunder Stumpges <tstump...@ntent.com> writes:
>
> > Hi all,
> >
> > We are using Samza (0.12.0) in about 2 dozen jobs implementing several
> > processing pipelines. We have also begun a significant move of other
> > services within our company to Docker/Kubernetes. Right now our
> > Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce" jobs
> (many reporting and other batch processing jobs). We would really like to
> move our stream processing off of Hadoop/Yarn and onto Kubernetes.
> >
> > When I just read about some of the new progress in .13 and .14 I got
> > really excited! We would love to have our jobs run as simple libraries
> > in our own JVM, and use the Kafka High-Level-Consumer for partition
> distribution and such. This would let us "dockerfy" our application and
> run/scale in kubernetes.
> >
> > However as I read it, this new deployment model is ONLY for the
> > new(er) High Level API, correct? Is there a plan and/or resources for
> > adapting this back to existing low-level tasks ? How complicated of a
> task is that? Do I have any other options to make this transition easier?
> >
> > Thanks in advance.
> > Thunder
>
>
>
>
>
> --
>
> Jagadish V,
>
> Graduate Student,
>
> Department of Computer Science,
>
> Stanford University
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to