>> Thanks for the info on the tradeoffs. That makes a lot of sense. I am on-board with using ZkJobCoordinator, sounds like some good benefits over just the Kafka high-level consumer.
This certainly looks like the simplest alternative. For your other questions, please find my answers inline. >> Q1: If I use LocalApplicationRunner, It does not use "ProcessJobFactory" (or any StreamJob or *Job classes) correct? Your understanding is correct. It directly instantiates the *StreamProcessor*, which in-turn creates and runs the *SamzaContainer*. >> Q2: If I use LocalApplicationRunner, I will need to code myself the loading and rewriting of the Config that is currently handled by JobRunner, correct? I don't think you'll need to do this. IIUC, the *LocalApplicationRunner* should automatically invoke rewriters and do the right thing. >> Q3: Do I need to also handle coordinator stream(s) and storing of config that is done in JobRunner (I don’t think so as the ? I don't think this is necessary either. The creation of coordinator stream and persisting configuration happens in the *LocalApplicationRunner* (more specifically in *StreamManager#createStreams*). >> Q4: Where/How do I specify the Container ID for each instance? Is there a config setting that I can pass, (or pull from an env variable and add to the config) ? I am assuming it is my responsibility to ensure that each instance is started with a unique container ID..? Nope, If you are using the *ZkJobCoordinator*, you need not have to worry about assigning IDs for each instance. The framework will automatically take care of generating IDs and reaching consensus by electing a leader. If you are curious please take a look at implementations of the *ProcessorIdGenerator* interface. Please let us know should you have further questions! Best, Jagdish On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges <tstump...@ntent.com> wrote: > Thanks for the info on the tradeoffs. That makes a lot of sense. I am > on-board with using ZkJobCoordinator, sounds like some good benefits over > just the Kafka high-level consumer. > > > > To that end, I have made some notes on possible approaches based on the > previous thread, and from my look into the code. I’d love to get feedback. > > > > Approach 1. Configure jobs to use “ProcessJobFactory” and run instances of > the job using run-job.sh or using JobRunner directly. > > I don’t think this makes sense from what I can see for a few reasons: > > - JobRunner is concerned with stuff I don't *think* we need: > - coordinatorSystemProducer|Consumer, > - writing/reading the configuration to the coordinator streams > - ProcessJobFactory hard-codes the ID to “0” so I don’t think that > will work for multiple instances. > > > > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and invoke > LocalApplicationRunner.runTask() > > > > Q1: If I use LocalApplicationRunner, It does not use > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? > > Q2: If I use LocalApplicationRunner, I will need to code myself the > loading and rewriting of the Config that is currently handled by JobRunner, > correct? > > Q3: Do I need to also handle coordinator stream(s) and storing of > config that is done in JobRunner (I don’t think so as the ? > > Q4: Where/How do I specify the Container ID for each instance? Is > there a config setting that I can pass, (or pull from an env variable and > add to the config) ? I am assuming it is my responsibility to ensure that > each instance is started with a unique container ID..? > > > > I am getting started on the above (Approach 2.), and looking closer at the > code so I may have my own answers to my questions, but figured I should go > ahead and ask now anyway. Thanks! > > > > -Thunder > > > > > > *From:* Jagadish Venkatraman [mailto:jagadish1...@gmail.com] > *Sent:* Thursday, March 15, 2018 1:41 > *To:* dev@samza.apache.org; Thunder Stumpges <tstump...@ntent.com>; > t...@recursivedream.com > *Cc:* yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> > > *Subject:* Re: Old style "low level" Tasks with alternative deployment > model(s) > > > > >> You are correct that this is focused on the higher-level API but > doesn't > preclude using the lower-level API. I was at the same point you were not > long ago, in fact, and had a very productive conversation on the list > > > > Thanks Tom for linking the thread, and I'm glad that you were able to get > > Kubernetes integration working with Samza. > > > > >> If it is helpful for everyone, once I get the low-level API + > ZkJobCoordinator + Docker + > > K8s working, I'd be glad to formulate an additional sample for hello-samza. > > > > @Thunder Stumpges: > > We'd be thrilled to receive your contribution. Examples, demos, tutorials > etc. > > contribute a great deal to improving the ease of use of Apache Samza. I'm > happy > > to shepherd design discussions/code-reviews in the open-source including > answering > > any questions you may have. > > > > > > >> One thing I'm still curious about, is what are the drawbacks or > complexities of leveraging the Kafka High-level consumer + > PassthroughJobCoordinator in a stand-alone setup like this? We do have > Zookeeper (because of kafka) so I think either would work. The Kafka > High-level consumer comes with other nice tools for monitoring offsets, > lag, etc > > > > > > @Thunder Stumpges: > > > > Samza uses a "Job-Coordinator" to assign your input-partitions among the > > different instances of your application s.t. they don't overlap. A typical > way to solve this "partition distribution" > > problem is to have a single instance elected as a "leader" and have the > leader assign partitions to the group. > > The ZkJobCoordinator uses Zk primitives to achieve this, while the YarnJC > relies on Yarn's guarantee that there will be a > > singleton-AppMaster to achieve this. > > > > A key difference that separates the PassthroughJC from the Yarn/Zk > variants is that it > > does _not_ attempt to solve the "partition distribution" problem. As a > result, there's no > > leader-election involved. Instead, it pushes the problem of "partition > distribution" to the underlying > > consumer. > > > > The PassThroughJc supports these 2 scenarios: > > > > *1. Consumer-managed partition distribution:* When using the Kafka > high-level consumer > > (or an AWS KinesisClientLibrary consumer) with Samza, the consumer manages > partitions internally. > > > > *2. Static partition distribution:* Alternately, partitions can be > managed statically using > > configuration. You can achieve static partition assignment by implementing > a custom > > *SystemStreamPartitionGrouper > <https://samza.apache.org/learn/documentation/0.8/api/javadocs/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.html>* > and *TaskNameGrouper > <https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>*. > Solutions in this category will typically > > require you to distinguish the various processors in the group by > providing an "id" for each. > > Once the "id"s are decided, you can then statically compute assignments > using a function (eg: modulo N). > > You can rely on the following mechanisms to provide this id: > > - Configure each instance differently to have its own id > > - Obtain the id from the cluster-manager. For instance, Kubernetes will > provide each POD an > > unique id in the range [0,N). AWS ECS should expose similar capabilities > via a REST end-point. > > > > >> One thing I'm still curious about, is what are the drawbacks or > complexities of leveraging the Kafka High-level consumer + > PassthroughJobCoordinator in a stand-alone setup like this? > > > > *Leveraging the Kafka High-level consumer:* > > > > The Kafka high-level consumer is not integrated into Samza just yet. > Instead, Samza's integration with Kafka > > uses the low-level consumer because > > i) It allows for greater control in fetching data from individual > brokers. It is simple and > > performant in-terms of the threading model to have one-thread pull from > each broker. > > ii) It is efficient in memory utilization since it does not do > internal-buffering of messages. > > iii) There's no overhead like Kafka-controller heart-beats that are driven > by *consumer.poll* > > > > Since there's no built-in integration, you will have to build a new > *SystemConsumer* > > if you need to integrate with the Kafka High-level consumer. Further, > there's more a fair > > bit of complexity to manage in checkpointing. > > > > >> The Kafka High-level consumer comes with other nice tools for > monitoring offsets, lag, etc > > > > Samza exposes > <https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala> > the > below metrics for lag-monitoring: > > - The current log-end offset for each partition > > - The last check-pointed offset for each partition > > - The number of messages behind the highwatermark of the partition > > > > Please let us know if you need help discovering these or integrating these > with other systems/tools. > > > > > > *Leveraging the Passthrough JobCoordinator:* > > > > It's helpful to split this discussion on tradeoffs with PassthroughJC into > 2 parts: > > > > 1. PassthroughJC + consumer managed partitions: > > > > - In this model, Samza has no control over partition-assignment since it's > managed by the consumer. This means that stateful > > operations like joins that rely on partitions being co-located on the same > task will not work. > > Simple stateless operations (eg: map, filter, remote lookups) are fine. > > > > - A key differentiator between Samza and other frameworks is our support > for "host affinity > <https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html>". > Samza achieves this > > by assigning partitions to hosts taking data-locality into account. If the > consumer can arbitrarily shuffle partitions, it'd be hard to > > support this affinity/locality. Often this is a key optimization when > dealing with large stateful jobs. > > > > 2. PassthroughJC + static partitions: > > > > - In this model, it is possible to make stateful processing (including > host affinity) work by carefully choosing how "id"s are > > assigned and computed. > > > > *Recommendation:* > > > > - Owing to the above subtleties, I would recommend that we give the > ZkJobCoordinator + the built-in low-level Kafka integration a try. > > - If we hit snags down this path, we can certainly explore the approach > with PassthroughJC + static partitions. > > - Using the PassthroughJC + consumer-managed distribution would be least > preferable owing to the subtleties I outlined above. > > > > Please let us know should you have more questions. > > > > Best, > > Jagdish > > > > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <tstump...@ntent.com> > wrote: > > Wow, what great timing, and what a great thread! I definitely have some > good starters to go off of here. > > If it is helpful for everyone, once I get the low-level API + > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an > additional sample for hello-samza. > > One thing I'm still curious about, is what are the drawbacks or > complexities of leveraging the Kafka High-level consumer + > PassthroughJobCoordinator in a stand-alone setup like this? We do have > Zookeeper (because of kafka) so I think either would work. The Kafka > High-level consumer comes with other nice tools for monitoring offsets, > lag, etc.... > > Thanks guys! > -Thunder > > > -----Original Message----- > From: Tom Davis [mailto:t...@recursivedream.com] > Sent: Wednesday, March 14, 2018 17:50 > To: dev@samza.apache.org > Subject: Re: Old style "low level" Tasks with alternative deployment > model(s) > > Hey there! > > You are correct that this is focused on the higher-level API but doesn't > preclude using the lower-level API. I was at the same point you were not > long ago, in fact, and had a very productive conversation on the list: > you should look for "Question about custom StreamJob/Factory" in the list > archive for the past couple months. > > I'll quote Jagadish Venkatraman from that thread: > > > For the section on the low-level API, can you use > > LocalApplicationRunner#runTask()? It basically creates a new > > StreamProcessor and runs it. Remember to provide task.class and set it > > to your implementation of StreamTask or AsyncStreamTask. Please note > > that this is an evolving API and hence, subject to change. > > I ended up just switching to the high-level API because I don't have any > existing Tasks and the Kubernetes story is a little more straight forward > there (there's only one container/configuration to deploy). > > Best, > > Tom > > Thunder Stumpges <tstump...@ntent.com> writes: > > > Hi all, > > > > We are using Samza (0.12.0) in about 2 dozen jobs implementing several > > processing pipelines. We have also begun a significant move of other > > services within our company to Docker/Kubernetes. Right now our > > Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce" jobs > (many reporting and other batch processing jobs). We would really like to > move our stream processing off of Hadoop/Yarn and onto Kubernetes. > > > > When I just read about some of the new progress in .13 and .14 I got > > really excited! We would love to have our jobs run as simple libraries > > in our own JVM, and use the Kafka High-Level-Consumer for partition > distribution and such. This would let us "dockerfy" our application and > run/scale in kubernetes. > > > > However as I read it, this new deployment model is ONLY for the > > new(er) High Level API, correct? Is there a plan and/or resources for > > adapting this back to existing low-level tasks ? How complicated of a > task is that? Do I have any other options to make this transition easier? > > > > Thanks in advance. > > Thunder > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University