>> Thanks for the timely and thorough reply! Based on your explanation, it
sounds like when using the high-level API I don't need to go through the
JobRunner or `run-job.sh` at all -- is that correct?

Your understanding is right. You can just run multiple instances of the
run-app.sh script.
You can have Kubernetes auto-scale your load depending on your traffic
pattern and spawn
another run-app.sh instance.


>> As for contributing back, I absolutely plan to do so! I will probably
end up doing a series of blog posts as this is (management-willing) the
start of a large undertaking to fix a lot of thorny problems we have
with our backend apps and unify on Samza for batch/stream operations.

Glad that you are considering Samza. We are happy to support your use-cases
and provide
help in configuration, tune-ups etc.

 >> A KubernetesJob/Factory implementation that can be used with
  JobRunner to launch an appropriately-sized stateful deployment

I believe writing a separate JobFactory/Job for Kubernetes is complex than
simply using the LocalApplicationRunner in our Wikipedia example. Hence, I
would
suggest that we try out the simpler alternative.

>> Create an image that executes a derived `LocalContainerRunner

I am not entirely sure you need to derive the LocalContainerRunner. We can
certainly
consider it later if we hit some snags using Zk for coordination.

>> It seems that, overall, the lower-level Task API is more tied to the
notion of resource management than the higher-level Application API.

The low-level API offers you better flexibility and control while
the high-level API is much more declarative. You can use both the API
variants and
run your coordination using Zk or Yarn or Passthrough JobCoordinators.
In-fact, internally Samza compiles the high-level API into its equivalent
low-level task-based
API and executes it.

 >> Is it fair the say the Application is effectively a single Task in that
it
runs in a single thread, reading messages from one or more input
streams, and writing messages to zero or more output streams? And that I
should compose more complex topologies by running multiple applications
in my cluster?

You can compose complex topologies in a single StreamApplication by using
the high-level API.
Alternately, you can use the low-level API and stitch multiple of your
applications together where
the output of one application is read by another application. The Wikipedia
examples in
the hello-samza repository contain both patterns.


On Sat, Jan 27, 2018 at 8:52 PM, Tom Davis <t...@recursivedream.com> wrote:

> Thanks for the timely and thorough reply! Based on your explanation, it
> sounds like when using the high-level API I don't need to go through the
> JobRunner or `run-job.sh` at all -- is that correct? I can simply run as
> many instances of, e.g., `WikipediaZkLocalApplication` as I want and
> Samza will take care of assigning partitions, responding to changes via
> ZK, etc.?
>
> As for contributing back, I absolutely plan to do so! I will probably
> end up doing a series of blog posts as this is (management-willing) the
> start of a large undertaking to fix a lot of thorny problems we have
> with our backend apps and unify on Samza for batch/stream operations. I
> am wrapping up a small repo now that has:
>
> 1. A KubernetesJob/Factory implementation that can be used with
>   JobRunner to launch an appropriately-sized stateful deployment
> 2. An example gradle-based app that generates a Docker image capable of
>   executing the packaged StreamApplication (plus example k8s resources)
>
> On the topic of (2), I fear I mostly just rehashed the application part
> of hello-samza with different Gradle plugins, but as a learning exercise
> it has been instructive.
>
> My initial questions were in regards to (1) but after reading your
> explanation I'm not clear on the way forward there (and it's certainly
> more complex than the StreamApplication approach). My thought right now
> is:
>
> 1. Create an image that executes a derived `LocalContainerRunner`
> 2. Ensure both CONTAINER_ID and COORDINATOR_URL are set; the former to
>   the stable ID generated by Kubernetes (e.g. "app-0", "app-1", etc.)
>   and the latter to, e.g. "file:///serialized/job/model.json"
>
> However, this would require writing a LocalContainerRunner that didn't
> assume it was running at the pleasure of a YARN/Mesos-style resource
> negotiator (it can't exactly heartbeat with a JSON file, etc.)
>
> It seems that, overall, the lower-level Task API is more tied to the
> notion of resource management than the higher-level Application API. Is
> it fair the say the Application is effectively a single Task in that it
> runs in a single thread, reading messages from one or more input
> streams, and writing messages to zero or more output streams? And that I
> should compose more complex topologies by running multiple applications
> in my cluster?
>
> Okay, enough questions for now! I hope to publish the repository
> tomorrow and would love to get some more experienced eyes on it to learn
> all the ways I screwed up. I'll post to this thread again with a link.
>
>
> Thanks,
>
> Tom
>
> Jagadish Venkatraman <jagadish1...@gmail.com> writes:
>
> +Yi
>>
>> Hi Tom,
>>
>> Thank you for your feedback on Samza's architecture. Pluggability has been
>> a
>> differentiator that has enabled us to support a wide range of use-cases -
>> from stand-alone
>> deployments to managed services, from streaming to batch inputs and
>> integrations with
>> various systems from Kafka, Kinesis, Azure to ElasticSearch.
>>
>> Thanks for your ideas on integrating Samza and Kubernetes. Let me
>> formalize
>> your
>> intuition a bit more.
>>
>> The following four aspects are key to running Samza with any environment.
>>
>> 1. Liveness detection/monitoring: This provides a means for discovering
>> the
>> currently available
>> processors in the group and discovering when a processor is no longer
>> running. The different
>> JC implementations we have rely on Zk, Yarn or AzureBlobStore for liveness
>> detection.
>>
>> 2. Partition-assignment/coordination: Once there is agreement on the
>> available processors,
>> this is just a matter of computing assignments.
>>
>> Usually, (1) and (2) will require you to identify each processor and to
>> agree on the available
>> processors in the group. For example, when the ClusterBasedJC starts a
>> container, it
>> is assigned a durable ID.
>>
>> 3. Resource management: This focusses on whether you want your containers
>> to be
>>  managed / started by Samza itself or have something external to Samza
>> that
>> starts it. While
>> the former allows you to run a managed service, the latter allows for more
>> flexibility in your
>> deployment environments. We use both models heavily at LinkedIn.
>>
>> As an example, the ClusterBasedJC requests resources from YARN and starts
>> the
>> containers itself. The ZkBasedJC assumes a more general deployment model
>> and allows
>> containers to be started externally and relies on Samza only for (1) and
>> (2).
>>
>> 4. Auto-scaling: Here again, you can build auto-scaling right into Samza
>> if
>> there's support
>> for resource management or do it externally.
>>
>> Having said this, you can implement this integration with Kubernetes at
>> multiple-levels
>> depending on how we choose to tackle the above aspects.
>>
>> ">> My intuition is that I need a JobCoordinator/Factory in the form of a
>> server that sets up Watches on the appropriate Kubernetes resources so
>> that when I perform the action in [4.1] *something* happens. "
>>
>> This alternative does seem more complex. Hence, I would not go down this
>> path as
>> the first-step.
>>
>> For a start, I would lean on the side of simplicity and recommend the
>> following solution:
>> - Configure your Samza job to leverage the existing ZkBasedJC.
>> - Start multiple instances of your job by running the *run-app.sh* script.
>>
>> I believe Kubernetes
>> has good support for this as well.
>> - Configure Kubernetes to auto-scale your instances on-demand depending on
>> load.
>> - As new instances join and leave, Samza will automatically re-distribute
>> partitions
>> among them.
>>
>> Additionally, we would be thrilled if you could contribute your learnings
>> back to the
>> community - in the form of a blog-post / documentation to Samza itself on
>> running with
>> Kubernetes.
>>
>> Please let us know should you need any further help. Here's an example to
>> get you started:
>> https://github.com/apache/samza-hello-samza/tree/master/src/
>> main/java/samza/examples/wikipedia/application
>>
>> Best,
>> Jagdish
>>
>> On Sat, Jan 27, 2018 at 8:54 AM, Tom Davis <t...@recursivedream.com>
>> wrote:
>>
>> Hi there! First off, thanks for the continued work on Samza -- I looked
>>> into many DC/stream processors and Samza was a real standout with its
>>> smart architecture and pluggable design.
>>>
>>> I'm working on a custom StreamJob/Factory for running Samza jobs on
>>> Kubernetes. Those two classes are pretty straight forward: I create a
>>> Deployment in Kubernetes with the appropriate number of Pods (a number
>>> <= the number of Kafka partitions I created the input topic with). Now
>>> I'm moving onto what executes in the actual Docker containers and I'm a
>>> bit confused.
>>>
>>> My plan was to mirror as much as possible what the YarnJob does
>>> which is setup an environment that will work with `run-jc.sh`. However,
>>> I don't need ClusterBasedJobCoordinator because Kubernetes is not an
>>> offer-based resource negotiator; if the JobCoordinator is running it
>>> means, by definition, it received the appropriate resources. So a
>>> PassThroughJobCoordinator with appropriate main() method seemed like the
>>> ticket. Unfortunately, the PTJC doesn't actually seem to *do* anything
>>> -- unlike the CBJC which has a run-loop and presumably schedules
>>> containers and the like.
>>>
>>> I saw the preview documentation on flexible deployment, but it didn't
>>> totally click for me. Perhaps because it was also my first introduction
>>> to the high-level API? (I've just been writing StreamTask impls)
>>>
>>> Here's a brief description of the workflow I'm envisioning, perhaps
>>> someone could tell me the classes I should implement and what sorts of
>>> containers I might need running in the environment to coordinate
>>> everything?
>>>
>>> 1. I create a topic in Kafka with N partitions
>>> 2. I start a job configured to run N-X containers
>>>  2.1. If my topic has 4 partitions and I have low load, I might want
>>>       X to start at 3 so I only have 1 task instance
>>> 3. Samza is configured to send all partitions to task instance 1
>>> 4. Later, load increases.
>>>  4.1. I use Kubernetes to scale the job to 4 pods/containers
>>>  4.2. Samza re-configures such that the new containers receive work
>>>
>>> My intuition is that I need a JobCoordinator/Factory in the form of a
>>> server that sets up Watches on the appropriate Kubernetes resources so
>>> that when I perform the action in [4.1] *something* happens. Or perhaps
>>> I should use ZkJobCoordinator? Presumably as pods/containers come and go
>>> they will cause changes in ZK that will trigger Task restarts or
>>> whatever logic the coordinator employs?
>>>
>>> Okay, I'll stop rambling now. Thanks in advance for any tips!
>>>
>>> - Tom
>>>
>>>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to