Thanks a lot for the info. I have something basically working at this point! I 
have not integrated it with Docker nor Kubernetes yet, but it does run from my 
local machine.

I have determined that LocalApplicationRunner does NOT do config rewriting. I 
had to write my own little “StandAloneApplicationRunner” that handles the 
“main” entrypoint. It does command parsing using CommandLine, load config from 
ConfigFactory, and perform rewriting before creating the new instance of 
LocalApplicationRunner. This is all my StandAloneApplicationRunner contains:


object StandAloneSamzaRunner extends App with LazyLogging {

  // parse command line args just like JobRunner.
  val cmdline = new ApplicationRunnerCommandLine
  val options = cmdline.parser.parse(args: _*)
  val config = cmdline.loadConfig(options)

  val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
  runner.runTask()
  runner.waitForFinish()
}

The only config settings I needed to make to use this runner were (easily 
configured due to our central Consul config system and our rewriter) :

# use the ZK based job coordinator
job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
# need to use GroupByContainerIds instead of GroupByContainerCount
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
# ZKJC config
job.coordinator.zk.connect=<our_zk_connection>

I did run into one potential problem; as you see above, I have started the task 
using runTask() and then to prevent my main method from returning, I have 
called waitForFinish(). The first time I ran it, the job itself failed because 
I had forgotten to override the task grouper, and container count was pulled 
from our staging environment. There are some failures logged and it appears the 
JobCoordinator fails, but it never returns from waitForFinish. Stack trace and 
continuation of log is below:

2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR 
o.a.s.zk.ScheduleAfterDebounceTime - Execution of action: OnProcessorChange 
failed.
java.lang.IllegalArgumentException: Your container count (4) is larger than 
your task count (2). Can't have containers with nothing to do, so aborting.
       at 
org.apache.samza.container.grouper.task.GroupByContainerCount.validateTasks(GroupByContainerCount.java:212)
       at 
org.apache.samza.container.grouper.task.GroupByContainerCount.group(GroupByContainerCount.java:62)
       at 
org.apache.samza.container.grouper.task.TaskNameGrouper.group(TaskNameGrouper.java:56)
       at 
org.apache.samza.coordinator.JobModelManager$.readJobModel(JobModelManager.scala:266)
       at 
org.apache.samza.coordinator.JobModelManager.readJobModel(JobModelManager.scala)
       at 
org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(ZkJobCoordinator.java:306)
       at 
org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:197)
       at 
org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerImpl.lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
       at 
org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134)
       at 
java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
       at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
       at java.util.concurrent.FutureTask.run(FutureTask.java)
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:745)
2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG 
o.a.samza.processor.StreamProcessor - Container is not instantiated yet.
2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG 
org.I0Itec.zkclient.ZkClient - Closing ZkClient...
2018-03-15 22:34:32 logback 77789 [ZkClient-EventThread-15-10.0.127.114:2181] 
INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread.

And then the application continues on with metric reporters, and other debug 
logging (not actually running the task though)

Thanks in advance for the guidance, this has been easier than I imagined! I’ll 
report back when I get more of the Dockerization/Kubernetes running and test it 
a bit more.
Cheers,
Thunder


From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com]
Sent: Thursday, March 15, 2018 14:46
To: Thunder Stumpges <tstump...@ntent.com>
Cc: dev@samza.apache.org; t...@recursivedream.com; yi...@linkedin.com; Yi Pan 
<nickpa...@gmail.com>
Subject: Re: Old style "low level" Tasks with alternative deployment model(s)

>>  Thanks for the info on the tradeoffs. That makes a lot of sense. I am 
>> on-board with using ZkJobCoordinator, sounds like some good benefits over 
>> just the Kafka high-level consumer.

This certainly looks like the simplest alternative.

For your other questions, please find my answers inline.

>> Q1: If I use LocalApplicationRunner, It does not use "ProcessJobFactory" (or 
>> any StreamJob or *Job classes) correct?

Your understanding is correct. It directly instantiates the StreamProcessor, 
which in-turn creates and runs the SamzaContainer.

>> Q2: If I use LocalApplicationRunner, I will need to code myself the loading 
>> and rewriting of the Config that is currently handled by JobRunner, correct?

I don't think you'll need to do this. IIUC, the LocalApplicationRunner should 
automatically invoke rewriters and do the right thing.

>>  Q3: Do I need to also handle coordinator stream(s) and storing of config 
>> that is done in JobRunner (I don’t think so as the ?

I don't think this is necessary either. The creation of coordinator stream and 
persisting configuration happens in the LocalApplicationRunner (more 
specifically in StreamManager#createStreams).

>> Q4: Where/How do I specify the Container ID for each instance? Is there a 
>> config setting that I can pass, (or pull from an env variable and add to the 
>> config) ? I am assuming it is my responsibility to ensure that each instance 
>> is started with a unique container ID..?

Nope, If you are using the ZkJobCoordinator, you need not have to worry about 
assigning IDs for each instance. The framework will automatically take care of 
generating IDs and reaching consensus by electing a leader. If you are curious 
please take a look at implementations of the ProcessorIdGenerator interface.

Please let us know should you have further questions!

Best,
Jagdish

On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges 
<tstump...@ntent.com<mailto:tstump...@ntent.com>> wrote:

Thanks for the info on the tradeoffs. That makes a lot of sense. I am on-board 
with using ZkJobCoordinator, sounds like some good benefits over just the Kafka 
high-level consumer.



To that end, I have made some notes on possible approaches based on the 
previous thread, and from my look into the code. I’d love to get feedback.



Approach 1. Configure jobs to use “ProcessJobFactory” and run instances of the 
job using run-job.sh or using JobRunner directly.

I don’t think this makes sense from what I can see for a few reasons:

  *   JobRunner is concerned with stuff I don't *think* we need:

     *   coordinatorSystemProducer|Consumer,
     *   writing/reading the configuration to the coordinator streams

  *   ProcessJobFactory hard-codes the ID to “0” so I don’t think that will 
work for multiple instances.



Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and invoke 
LocalApplicationRunner.runTask()



    Q1: If I use LocalApplicationRunner, It does not use "ProcessJobFactory" 
(or any StreamJob or *Job classes) correct?

    Q2: If I use LocalApplicationRunner, I will need to code myself the loading 
and rewriting of the Config that is currently handled by JobRunner, correct?

    Q3: Do I need to also handle coordinator stream(s) and storing of config 
that is done in JobRunner (I don’t think so as the ?

    Q4: Where/How do I specify the Container ID for each instance? Is there a 
config setting that I can pass, (or pull from an env variable and add to the 
config) ? I am assuming it is my responsibility to ensure that each instance is 
started with a unique container ID..?

I am getting started on the above (Approach 2.), and looking closer at the code 
so I may have my own answers to my questions, but figured I should go ahead and 
ask now anyway. Thanks!

-Thunder


From: Jagadish Venkatraman 
[mailto:jagadish1...@gmail.com<mailto:jagadish1...@gmail.com>]
Sent: Thursday, March 15, 2018 1:41
To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder Stumpges 
<tstump...@ntent.com<mailto:tstump...@ntent.com>>; 
t...@recursivedream.com<mailto:t...@recursivedream.com>
Cc: yi...@linkedin.com<mailto:yi...@linkedin.com>; Yi Pan 
<nickpa...@gmail.com<mailto:nickpa...@gmail.com>>

Subject: Re: Old style "low level" Tasks with alternative deployment model(s)

>> You are correct that this is focused on the higher-level API but doesn't
preclude using the lower-level API. I was at the same point you were not
long ago, in fact, and had a very productive conversation on the list

Thanks Tom for linking the thread, and I'm glad that you were able to get
Kubernetes integration working with Samza.

>> If it is helpful for everyone, once I get the low-level API + 
>> ZkJobCoordinator + Docker +
K8s working, I'd be glad to formulate an additional sample for hello-samza.

@Thunder Stumpges:
We'd be thrilled to receive your contribution. Examples, demos, tutorials etc.
contribute a great deal to improving the ease of use of Apache Samza. I'm happy
to shepherd design discussions/code-reviews in the open-source including 
answering
any questions you may have.


>> One thing I'm still curious about, is what are the drawbacks or complexities 
>> of leveraging the Kafka High-level consumer + PassthroughJobCoordinator in a 
>> stand-alone setup like this? We do have Zookeeper (because of kafka) so I 
>> think either would work. The Kafka High-level consumer comes with other nice 
>> tools for monitoring offsets, lag, etc


@Thunder Stumpges:

Samza uses a "Job-Coordinator" to assign your input-partitions among the
different instances of your application s.t. they don't overlap. A typical way 
to solve this "partition distribution"
problem is to have a single instance elected as a "leader" and have the leader 
assign partitions to the group.
The ZkJobCoordinator uses Zk primitives to achieve this, while the YarnJC 
relies on Yarn's guarantee that there will be a
singleton-AppMaster to achieve this.

A key difference that separates the PassthroughJC from the Yarn/Zk variants is 
that it
does _not_ attempt to solve the "partition distribution" problem. As a result, 
there's no
leader-election involved. Instead, it pushes the problem of "partition 
distribution" to the underlying
consumer.

The PassThroughJc supports these 2 scenarios:

1. Consumer-managed partition distribution: When using the Kafka high-level 
consumer
(or an AWS KinesisClientLibrary consumer) with Samza, the consumer manages 
partitions internally.

2. Static partition distribution: Alternately, partitions can be managed 
statically using
configuration. You can achieve static partition assignment by implementing a 
custom
SystemStreamPartitionGrouper<https://samza.apache.org/learn/documentation/0.8/api/javadocs/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.html>
 and 
TaskNameGrouper<https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>.
 Solutions in this category will typically
require you to distinguish the various processors in the group by providing an 
"id" for each.
Once the "id"s are decided, you can then statically compute assignments using a 
function (eg: modulo N).
You can rely on the following mechanisms to provide this id:
 - Configure each instance differently to have its own id
 - Obtain the id from the cluster-manager. For instance, Kubernetes will 
provide each POD an
unique id in the range [0,N). AWS ECS should expose similar capabilities via a 
REST end-point.

>> One thing I'm still curious about, is what are the drawbacks or complexities 
>> of leveraging the Kafka High-level consumer + PassthroughJobCoordinator in a 
>> stand-alone setup like this?

Leveraging the Kafka High-level consumer:

The Kafka high-level consumer is not integrated into Samza just yet. Instead, 
Samza's integration with Kafka
uses the low-level consumer because
i) It allows for greater control in fetching data from individual brokers. It 
is simple and
performant in-terms of the threading model to have one-thread pull from each 
broker.
ii) It is efficient in memory utilization since it does not do 
internal-buffering of messages.
iii) There's no overhead like Kafka-controller heart-beats that are driven by 
consumer.poll

Since there's no built-in integration, you will have to build a new 
SystemConsumer
if you need to integrate with the Kafka High-level consumer. Further, there's 
more a fair
bit of complexity to manage in checkpointing.

>> The Kafka High-level consumer comes with other nice tools for monitoring 
>> offsets, lag, etc

Samza 
exposes<https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala>
 the below metrics for lag-monitoring:
- The current log-end offset for each partition
- The last check-pointed offset for each partition
- The number of messages behind the highwatermark of the partition

Please let us know if you need help discovering these or integrating these with 
other systems/tools.


Leveraging the Passthrough JobCoordinator:

It's helpful to split this discussion on tradeoffs with PassthroughJC into 2 
parts:

1. PassthroughJC + consumer managed partitions:

- In this model, Samza has no control over partition-assignment since it's 
managed by the consumer. This means that stateful
operations like joins that rely on partitions being co-located on the same task 
will not work.
Simple stateless operations (eg: map, filter, remote lookups) are fine.

- A key differentiator between Samza and other frameworks is our support for 
"host 
affinity<https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html>".
 Samza achieves this
by assigning partitions to hosts taking data-locality into account. If the 
consumer can arbitrarily shuffle partitions, it'd be hard to
support this affinity/locality. Often this is a key optimization when dealing 
with large stateful jobs.

2. PassthroughJC + static partitions:

- In this model, it is possible to make stateful processing (including host 
affinity) work by carefully choosing how "id"s are
assigned and computed.

Recommendation:

- Owing to the above subtleties, I would recommend that we give the 
ZkJobCoordinator + the built-in low-level Kafka integration a try.
- If we hit snags down this path, we can certainly explore the approach with 
PassthroughJC + static partitions.
- Using the PassthroughJC + consumer-managed distribution would be least 
preferable owing to the subtleties I outlined above.

Please let us know should you have more questions.

Best,
Jagdish

On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges 
<tstump...@ntent.com<mailto:tstump...@ntent.com>> wrote:
Wow, what great timing, and what a great thread! I definitely have some good 
starters to go off of here.

If it is helpful for everyone, once I get the low-level API + ZkJobCoordinator 
+ Docker + K8s working, I'd be glad to formulate an additional sample for 
hello-samza.

One thing I'm still curious about, is what are the drawbacks or complexities of 
leveraging the Kafka High-level consumer + PassthroughJobCoordinator in a 
stand-alone setup like this? We do have Zookeeper (because of kafka) so I think 
either would work. The Kafka High-level consumer comes with other nice tools 
for monitoring offsets, lag, etc....

Thanks guys!
-Thunder

-----Original Message-----
From: Tom Davis [mailto:t...@recursivedream.com<mailto:t...@recursivedream.com>]
Sent: Wednesday, March 14, 2018 17:50
To: dev@samza.apache.org<mailto:dev@samza.apache.org>
Subject: Re: Old style "low level" Tasks with alternative deployment model(s)

Hey there!

You are correct that this is focused on the higher-level API but doesn't 
preclude using the lower-level API. I was at the same point you were not long 
ago, in fact, and had a very productive conversation on the list:
you should look for "Question about custom StreamJob/Factory" in the list 
archive for the past couple months.

I'll quote Jagadish Venkatraman from that thread:

> For the section on the low-level API, can you use
> LocalApplicationRunner#runTask()? It basically creates a new
> StreamProcessor and runs it. Remember to provide task.class and set it
> to your implementation of StreamTask or AsyncStreamTask. Please note
> that this is an evolving API and hence, subject to change.

I ended up just switching to the high-level API because I don't have any 
existing Tasks and the Kubernetes story is a little more straight forward there 
(there's only one container/configuration to deploy).

Best,

Tom

Thunder Stumpges <tstump...@ntent.com<mailto:tstump...@ntent.com>> writes:

> Hi all,
>
> We are using Samza (0.12.0) in about 2 dozen jobs implementing several
> processing pipelines. We have also begun a significant move of other
> services within our company to Docker/Kubernetes. Right now our
> Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce" jobs (many 
> reporting and other batch processing jobs). We would really like to move our 
> stream processing off of Hadoop/Yarn and onto Kubernetes.
>
> When I just read about some of the new progress in .13 and .14 I got
> really excited! We would love to have our jobs run as simple libraries
> in our own JVM, and use the Kafka High-Level-Consumer for partition 
> distribution and such. This would let us "dockerfy" our application and 
> run/scale in kubernetes.
>
> However as I read it, this new deployment model is ONLY for the
> new(er) High Level API, correct? Is there a plan and/or resources for
> adapting this back to existing low-level tasks ? How complicated of a task is 
> that? Do I have any other options to make this transition easier?
>
> Thanks in advance.
> Thunder



--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University



--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to