>> Thanks for the timely and thorough reply! Based on your explanation, it sounds like when using the high-level API I don't need to go through the JobRunner or `run-job.sh` at all -- is that correct?
Your understanding is right. You can just run multiple instances of the run-app.sh script. You can have Kubernetes auto-scale your load depending on your traffic pattern and spawn another run-app.sh instance. >> As for contributing back, I absolutely plan to do so! I will probably end up doing a series of blog posts as this is (management-willing) the start of a large undertaking to fix a lot of thorny problems we have with our backend apps and unify on Samza for batch/stream operations. Glad that you are considering Samza. We are happy to support your use-cases and provide help in configuration, tune-ups etc. >> A KubernetesJob/Factory implementation that can be used with JobRunner to launch an appropriately-sized stateful deployment I believe writing a separate JobFactory/Job for Kubernetes is complex than simply using the LocalApplicationRunner in our Wikipedia example. Hence, I would suggest that we try out the simpler alternative. >> Create an image that executes a derived `LocalContainerRunner I am not entirely sure you need to derive the LocalContainerRunner. We can certainly consider it later if we hit some snags using Zk for coordination. >> It seems that, overall, the lower-level Task API is more tied to the notion of resource management than the higher-level Application API. The low-level API offers you better flexibility and control while the high-level API is much more declarative. You can use both the API variants and run your coordination using Zk or Yarn or Passthrough JobCoordinators. In-fact, internally Samza compiles the high-level API into its equivalent low-level task-based API and executes it. >> Is it fair the say the Application is effectively a single Task in that it runs in a single thread, reading messages from one or more input streams, and writing messages to zero or more output streams? And that I should compose more complex topologies by running multiple applications in my cluster? You can compose complex topologies in a single StreamApplication by using the high-level API. Alternately, you can use the low-level API and stitch multiple of your applications together where the output of one application is read by another application. The Wikipedia examples in the hello-samza repository contain both patterns. On Sat, Jan 27, 2018 at 8:52 PM, Tom Davis <t...@recursivedream.com> wrote: > Thanks for the timely and thorough reply! Based on your explanation, it > sounds like when using the high-level API I don't need to go through the > JobRunner or `run-job.sh` at all -- is that correct? I can simply run as > many instances of, e.g., `WikipediaZkLocalApplication` as I want and > Samza will take care of assigning partitions, responding to changes via > ZK, etc.? > > As for contributing back, I absolutely plan to do so! I will probably > end up doing a series of blog posts as this is (management-willing) the > start of a large undertaking to fix a lot of thorny problems we have > with our backend apps and unify on Samza for batch/stream operations. I > am wrapping up a small repo now that has: > > 1. A KubernetesJob/Factory implementation that can be used with > JobRunner to launch an appropriately-sized stateful deployment > 2. An example gradle-based app that generates a Docker image capable of > executing the packaged StreamApplication (plus example k8s resources) > > On the topic of (2), I fear I mostly just rehashed the application part > of hello-samza with different Gradle plugins, but as a learning exercise > it has been instructive. > > My initial questions were in regards to (1) but after reading your > explanation I'm not clear on the way forward there (and it's certainly > more complex than the StreamApplication approach). My thought right now > is: > > 1. Create an image that executes a derived `LocalContainerRunner` > 2. Ensure both CONTAINER_ID and COORDINATOR_URL are set; the former to > the stable ID generated by Kubernetes (e.g. "app-0", "app-1", etc.) > and the latter to, e.g. "file:///serialized/job/model.json" > > However, this would require writing a LocalContainerRunner that didn't > assume it was running at the pleasure of a YARN/Mesos-style resource > negotiator (it can't exactly heartbeat with a JSON file, etc.) > > It seems that, overall, the lower-level Task API is more tied to the > notion of resource management than the higher-level Application API. Is > it fair the say the Application is effectively a single Task in that it > runs in a single thread, reading messages from one or more input > streams, and writing messages to zero or more output streams? And that I > should compose more complex topologies by running multiple applications > in my cluster? > > Okay, enough questions for now! I hope to publish the repository > tomorrow and would love to get some more experienced eyes on it to learn > all the ways I screwed up. I'll post to this thread again with a link. > > > Thanks, > > Tom > > Jagadish Venkatraman <jagadish1...@gmail.com> writes: > > +Yi >> >> Hi Tom, >> >> Thank you for your feedback on Samza's architecture. Pluggability has been >> a >> differentiator that has enabled us to support a wide range of use-cases - >> from stand-alone >> deployments to managed services, from streaming to batch inputs and >> integrations with >> various systems from Kafka, Kinesis, Azure to ElasticSearch. >> >> Thanks for your ideas on integrating Samza and Kubernetes. Let me >> formalize >> your >> intuition a bit more. >> >> The following four aspects are key to running Samza with any environment. >> >> 1. Liveness detection/monitoring: This provides a means for discovering >> the >> currently available >> processors in the group and discovering when a processor is no longer >> running. The different >> JC implementations we have rely on Zk, Yarn or AzureBlobStore for liveness >> detection. >> >> 2. Partition-assignment/coordination: Once there is agreement on the >> available processors, >> this is just a matter of computing assignments. >> >> Usually, (1) and (2) will require you to identify each processor and to >> agree on the available >> processors in the group. For example, when the ClusterBasedJC starts a >> container, it >> is assigned a durable ID. >> >> 3. Resource management: This focusses on whether you want your containers >> to be >> managed / started by Samza itself or have something external to Samza >> that >> starts it. While >> the former allows you to run a managed service, the latter allows for more >> flexibility in your >> deployment environments. We use both models heavily at LinkedIn. >> >> As an example, the ClusterBasedJC requests resources from YARN and starts >> the >> containers itself. The ZkBasedJC assumes a more general deployment model >> and allows >> containers to be started externally and relies on Samza only for (1) and >> (2). >> >> 4. Auto-scaling: Here again, you can build auto-scaling right into Samza >> if >> there's support >> for resource management or do it externally. >> >> Having said this, you can implement this integration with Kubernetes at >> multiple-levels >> depending on how we choose to tackle the above aspects. >> >> ">> My intuition is that I need a JobCoordinator/Factory in the form of a >> server that sets up Watches on the appropriate Kubernetes resources so >> that when I perform the action in [4.1] *something* happens. " >> >> This alternative does seem more complex. Hence, I would not go down this >> path as >> the first-step. >> >> For a start, I would lean on the side of simplicity and recommend the >> following solution: >> - Configure your Samza job to leverage the existing ZkBasedJC. >> - Start multiple instances of your job by running the *run-app.sh* script. >> >> I believe Kubernetes >> has good support for this as well. >> - Configure Kubernetes to auto-scale your instances on-demand depending on >> load. >> - As new instances join and leave, Samza will automatically re-distribute >> partitions >> among them. >> >> Additionally, we would be thrilled if you could contribute your learnings >> back to the >> community - in the form of a blog-post / documentation to Samza itself on >> running with >> Kubernetes. >> >> Please let us know should you need any further help. Here's an example to >> get you started: >> https://github.com/apache/samza-hello-samza/tree/master/src/ >> main/java/samza/examples/wikipedia/application >> >> Best, >> Jagdish >> >> On Sat, Jan 27, 2018 at 8:54 AM, Tom Davis <t...@recursivedream.com> >> wrote: >> >> Hi there! First off, thanks for the continued work on Samza -- I looked >>> into many DC/stream processors and Samza was a real standout with its >>> smart architecture and pluggable design. >>> >>> I'm working on a custom StreamJob/Factory for running Samza jobs on >>> Kubernetes. Those two classes are pretty straight forward: I create a >>> Deployment in Kubernetes with the appropriate number of Pods (a number >>> <= the number of Kafka partitions I created the input topic with). Now >>> I'm moving onto what executes in the actual Docker containers and I'm a >>> bit confused. >>> >>> My plan was to mirror as much as possible what the YarnJob does >>> which is setup an environment that will work with `run-jc.sh`. However, >>> I don't need ClusterBasedJobCoordinator because Kubernetes is not an >>> offer-based resource negotiator; if the JobCoordinator is running it >>> means, by definition, it received the appropriate resources. So a >>> PassThroughJobCoordinator with appropriate main() method seemed like the >>> ticket. Unfortunately, the PTJC doesn't actually seem to *do* anything >>> -- unlike the CBJC which has a run-loop and presumably schedules >>> containers and the like. >>> >>> I saw the preview documentation on flexible deployment, but it didn't >>> totally click for me. Perhaps because it was also my first introduction >>> to the high-level API? (I've just been writing StreamTask impls) >>> >>> Here's a brief description of the workflow I'm envisioning, perhaps >>> someone could tell me the classes I should implement and what sorts of >>> containers I might need running in the environment to coordinate >>> everything? >>> >>> 1. I create a topic in Kafka with N partitions >>> 2. I start a job configured to run N-X containers >>> 2.1. If my topic has 4 partitions and I have low load, I might want >>> X to start at 3 so I only have 1 task instance >>> 3. Samza is configured to send all partitions to task instance 1 >>> 4. Later, load increases. >>> 4.1. I use Kubernetes to scale the job to 4 pods/containers >>> 4.2. Samza re-configures such that the new containers receive work >>> >>> My intuition is that I need a JobCoordinator/Factory in the form of a >>> server that sets up Watches on the appropriate Kubernetes resources so >>> that when I perform the action in [4.1] *something* happens. Or perhaps >>> I should use ZkJobCoordinator? Presumably as pods/containers come and go >>> they will cause changes in ZK that will trigger Task restarts or >>> whatever logic the coordinator employs? >>> >>> Okay, I'll stop rambling now. Thanks in advance for any tips! >>> >>> - Tom >>> >>> -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University