Did you guys have a chance to review my "runner" class ? It's quite trivial, 
and I can port it to the samza codebase, give it a name of your choosing, and 
submit a PR if you like.

This is literally all of it:

object StandAloneSamzaRunner extends App with LazyLogging {

  // parse command line args just like JobRunner. Main Options as emit by the 
parser:
  val cmdline = new ApplicationRunnerCommandLine
  val options = cmdline.parser.parse(args: _*)
  val config = cmdline.loadConfig(options)

  // run the task using LocalApplicationRunner (must be configured just like 
standard JobRunner)
  val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
  runner.runTask()
  runner.waitForFinish()
}


Thanks,
Thunder


-----Original Message-----
From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] 
Sent: Tuesday, March 20, 2018 11:17
To: Prateek Maheshwari <prateek...@gmail.com>
Cc: dev@samza.apache.org; t...@recursivedream.com; yi...@linkedin.com; Yi Pan 
<nickpa...@gmail.com>
Subject: Re: Old style "low level" Tasks with alternative deployment model(s)

Hi Thunder,

Thank you for the PR. Really nice work!

Since, you have a working implementation on K8s, would you be willing to 
contribute a short tutorial / a post on this? We'll be sure to feature it in 
the official Samza web-site at http://samza.apache.org/.

It'd be a great addition to the Samza community to have a section on K8s 
integration! There have been multiple prior asks on this, and your learnings 
would be super-helpful.

Best,
Jagdish



On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari <prateek...@gmail.com>
wrote:

> Glad you were able to figure it out, that was very confusing. Thanks 
> for the fix too.
>
> - Prateek
>
> On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges 
> <tstump...@ntent.com>
> wrote:
>
>> And that last issue was mine. My setting override was not picked up 
>> and it was using GroupByContainerCount instead.
>> -Thanks,
>> Thunder
>>
>>
>> -----Original Message-----
>> From: Thunder Stumpges
>> Sent: Monday, March 19, 2018 20:58
>> To: dev@samza.apache.org
>> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; 
>> t...@recursivedream.com; yi...@linkedin.com; Yi Pan 
>> <nickpa...@gmail.com>
>> Subject: RE: Old style "low level" Tasks with alternative deployment
>> model(s)
>>
>> Well I figured it out. My specific issue was due to a simple 
>> dependency problem where I had gotten an older version of the Jackson-mapper 
>> library.
>> However the code was throwing NoSuchMethodError (an Error instead of
>> Exception) and being silently dropped. I created a pull request to 
>> handle any Throwable in ScheduleAfterDebounceTime.
>> https://github.com/apache/samza/pull/450
>>
>> I'm now running into an issue with the generation of the JobModel and 
>> the ProcessorId. The ZkJobCoordinator has a ProcessorId that is a 
>> Guid, but when GroupByContainerIds class (my TaskNameGrouper) creates 
>> the ContainerModels, it is using the ContainerId (a numeric value, 
>> 0,1,2,etc) as the ProcessorId (~ line 105). This results in the 
>> JobModel that is generated and published immediately causing the 
>> processor to quit with this
>> message:
>>
>> INFO  o.apache.samza.zk.ZkJobCoordinator - New JobModel does not 
>> contain pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this processor.
>>
>> I was assuming I should be using GroupByContainerIds as my 
>> TaskNameGrouper. I don't see any other promising implementations. Am 
>> I just missing something?
>>
>> Thanks,
>> Thunder
>>
>> JobModel
>> {
>>   "config" : {
>>   ...
>>   },
>>   "containers" : {
>>     "0" : {
>>       "tasks" : {
>>         "Partition 0" : {
>>           "task-name" : "Partition 0",
>>           "system-stream-partitions" : [ {
>>             "system" : "kafka",
>>             "partition" : 0,
>>             "stream" : "test_topic1"
>>           }, {
>>             "system" : "kafka",
>>             "partition" : 0,
>>             "stream" : "test_topic2"
>>           } ],
>>           "changelog-partition" : 0
>>         },
>>         "Partition 1" : {
>>           "task-name" : "Partition 1",
>>           "system-stream-partitions" : [ {
>>             "system" : "kafka",
>>             "partition" : 1,
>>             "stream" : "test_topic1"
>>           }, {
>>             "system" : "kafka",
>>             "partition" : 1,
>>             "stream" : "test_topic2"
>>           } ],
>>           "changelog-partition" : 1
>>         }
>>       },
>>       "container-id" : 0,
>>       "processor-id" : "0"
>>     }
>>   },
>>   "max-change-log-stream-partitions" : 2,
>>   "all-container-locality" : {
>>     "0" : null
>>   }
>> }
>>
>> -----Original Message-----
>> From: Thunder Stumpges [mailto:tstump...@ntent.com]
>> Sent: Friday, March 16, 2018 18:21
>> To: dev@samza.apache.org
>> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; 
>> t...@recursivedream.com; yi...@linkedin.com; Yi Pan 
>> <nickpa...@gmail.com>
>> Subject: RE: Old style "low level" Tasks with alternative deployment
>> model(s)
>>
>> Attached. I don't see any threads actually running this code which is odd.
>>
>> There's my main thread that's waiting for the whole thing to finish, 
>> the "debounce-thread-0" (which logged the other surrounding messages 
>> below) has
>> this:
>>
>> "debounce-thread-0" #18 daemon prio=5 os_prio=0 
>> tid=0x00007fa0fd719800
>> nid=0x21 waiting on condition [0x00007fa0d0d45000]
>>    java.lang.Thread.State: WAITING (parking)
>>         at sun.misc.Unsafe.park(Native Method)
>>         - parking to wait for  <0x00000006f166e350> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>         at 
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java
>> :175)
>>         at 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit
>> ionObject.await(AbstractQueuedSynchronizer.java:2039)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>> Queue.take(ScheduledThreadPoolExecutor.java:1081)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>> Queue.take(ScheduledThreadPoolExecutor.java:809)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx
>> ecutor.java:1067)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1127)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>    Locked ownable synchronizers:
>>         - None
>>
>> Thanks for having a look.
>> Thunder
>>
>>
>> -----Original Message-----
>> From: Prateek Maheshwari [mailto:prateek...@gmail.com]
>> Sent: Friday, March 16, 2018 17:02
>> To: dev@samza.apache.org
>> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; 
>> t...@recursivedream.com; yi...@linkedin.com; Yi Pan 
>> <nickpa...@gmail.com>
>> Subject: Re: Old style "low level" Tasks with alternative deployment
>> model(s)
>>
>> Hi Thunder,
>>
>> Can you please take and attach a thread dump with this?
>>
>> Thanks,
>> Prateek
>>
>> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges 
>> <tstump...@ntent.com>
>> wrote:
>>
>> > It appears it IS hung while serializing the JobModel... very strange!
>> > I added some debug statements around the calls:
>> >
>> >       LOG.debug("Getting object mapper to serialize job model");  
>> > // this IS printed
>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>> >       LOG.debug("Serializing job model"); // this IS printed
>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>> > ().writeValueAsString(jobModel);
>> >       LOG.info("jobModelAsString=" + jobModelStr); // this is NOT
>> printed!
>> >
>> > Another thing I noticed is that "getObjectMapper" actually creates 
>> > the object mapper twice!
>> >
>> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG 
>> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize 
>> > job model
>> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Creating new object mapper and simple module
>> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Adding SerDes and mixins
>> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Adding custom ContainerModel deserializer
>> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Setting up naming strategy and registering module
>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Done!
>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Creating new object mapper and simple module
>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Adding SerDes  and mixins
>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Adding custom ContainerModel deserializer
>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Setting up naming strategy and registering module
>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>> > o.a.s.s.model.SamzaObjectMapper
>> > - Done!
>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>> > org.apache.samza.zk.ZkUtils - Serializing job model
>> >
>> > Could this ObjectMapper be a singleton? I see there is a private 
>> > static instance, but getObjectMapper creates a new one every time...
>> >
>> > Anyway, then it takes off to serialize the job model and never 
>> > comes back...
>> >
>> > Hoping someone has some idea here... the implementation for this 
>> > mostly comes from Jackson-mapper-asl, and I have the version that 
>> > is linked in the
>> > 0.14.0 tag:
>> > |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
>> > |    |    |    |    \--- org.codehaus.jackson:jackson-core-asl:1.9.13
>> >
>> > Thanks!
>> > Thunder
>> >
>> > -----Original Message-----
>> > From: Thunder Stumpges [mailto:tstump...@ntent.com]
>> > Sent: Friday, March 16, 2018 15:29
>> > To: dev@samza.apache.org; Jagadish Venkatraman 
>> > <jagadish1...@gmail.com>
>> > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < 
>> > nickpa...@gmail.com>
>> > Subject: RE: Old style "low level" Tasks with alternative 
>> > deployment
>> > model(s)
>> >
>> > So, my investigation starts at StreamProcessor.java.  Line 294 in 
>> > method
>> > onNewJobModel() logs an INFO message that it's starting a container.
>> > This message never appears.
>> >
>> > I see that ZkJobCoordinator calls onNewJobModel from its 
>> > onNewJobModelConfirmed method which also logs an info message 
>> > stating "version X of the job model got confirmed". I never see 
>> > this message either, so I go up the chain some more.
>> >
>> > I DO see:
>> >
>> > 2018-03-16 21:43:58 logback 20498
>> > [ZkClient-EventThread-13-10.0.127.114:2181]
>> > INFO  o.apache.samza.zk.ZkJobCoordinator - 
>> > ZkJobCoordinator::onBecomeLeader
>> > - I became the leader!
>> > And
>> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO 
>> > o.apache.samza.zk.ZkJobCoordinator - 
>> > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
>> > new Job Model. Version = 1
>> >
>> > Which led me to method onDoProcessorChange line 210. I see that 
>> > line, but not the line below " Published new Job Model. Version =" 
>> > so something in here is not completing:
>> >
>> >     LOG.info("pid=" + processorId + "Generated new Job Model. 
>> > Version =
>> "
>> > + nextJMVersion);
>> >
>> >     // Publish the new job model
>> >     zkUtils.publishJobModel(nextJMVersion, jobModel);
>> >
>> >     // Start the barrier for the job model update
>> >     barrier.create(nextJMVersion, currentProcessorIds);
>> >
>> >     // Notify all processors about the new JobModel by updating 
>> > JobModel Version number
>> >     zkUtils.publishJobModelVersion(currentJMVersion, 
>> > nextJMVersion);
>> >
>> >     LOG.info("pid=" + processorId + "Published new Job Model. 
>> > Version =
>> "
>> > + nextJMVersion);
>> >
>> > As I mentioned, after the line "Generated new Job Model. Version = 1"
>> > I just get repeated zk ping responses.. no more application logging.
>> >
>> > The very next thing that's run is zkUtils.publishJobModel() which 
>> > only has two lines before another log statement (which I don't see):
>> >
>> >   public void publishJobModel(String jobModelVersion, JobModel
>> jobModel) {
>> >     try {
>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>> > ().writeValueAsString(jobModel);
>> >       LOG.info("jobModelAsString=" + jobModelStr);
>> >       ...
>> >
>> > Could it really be getting hung up on one of these two lines? 
>> > (seems like it must be, but I don't see anything there that seems 
>> > like it would just hang). I'll keep troubleshooting, maybe add some 
>> > more debug logging and try again.
>> >
>> > Thanks for any guidance you all might have.
>> > -Thunder
>> >
>> >
>> > -----Original Message-----
>> > From: Thunder Stumpges [mailto:tstump...@ntent.com]
>> > Sent: Friday, March 16, 2018 14:43
>> > To: dev@samza.apache.org; Jagadish Venkatraman 
>> > <jagadish1...@gmail.com>
>> > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < 
>> > nickpa...@gmail.com>
>> > Subject: RE: Old style "low level" Tasks with alternative 
>> > deployment
>> > model(s)
>> >
>> > Well I have my stand-alone application in docker and running in 
>> > kubernetes. I think something isn't wired up all the way though, 
>> > because my task never actually gets invoked. I see no errors, 
>> > however I'm not getting the usual startup logs (checking existing 
>> > offsets, "entering run loop"...) My logs look like this:
>> >
>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO 
>> > kafka.utils.VerifiableProperties
>> > - Verifying properties
>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO 
>> > kafka.utils.VerifiableProperties
>> > - Property client.id is overridden to 
>> > samza_admin-test_stream_task-1
>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO 
>> > kafka.utils.VerifiableProperties
>> > - Property metadata.broker.list is overridden to
>> > test-kafka-kafka.test-svc:9092
>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO 
>> > kafka.utils.VerifiableProperties
>> > - Property request.timeout.ms is overridden to 30000
>> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO 
>> > kafka.client.ClientUtils$ - Fetching metadata from broker
>> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation 
>> > id 0 for 1 topic(s) Set(dev_k8s.samza.test.topic)
>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG 
>> > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 
>> > 30000 (requested 30000), SO_RCVBUF = 179680 (requested -1), 
>> > SO_SNDBUF =
>> > 102400 (requested 102400), connectTimeoutMs = 30000.
>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO 
>> > kafka.producer.SyncProducer - Connected to
>> > test-kafka-kafka.test-svc:9092 for producing
>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO 
>> > kafka.producer.SyncProducer - Disconnecting from
>> > test-kafka-kafka.test-svc:9092
>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG 
>> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1
>> > topic(s)
>> > Set(dev_k8s.samza.test.topic)
>> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper 
>> > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158cc
>> > has grouped the SystemStreamPartitions into 10 tasks with the 
>> > following
>> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, 
>> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9, 
>> > Partition 8]
>> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being 
>> > assigned changelog partition 0.
>> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being 
>> > assigned changelog partition 1.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being 
>> > assigned changelog partition 2.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being 
>> > assigned changelog partition 3.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being 
>> > assigned changelog partition 4.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being 
>> > assigned changelog partition 5.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being 
>> > assigned changelog partition 6.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being 
>> > assigned changelog partition 7.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being 
>> > assigned changelog partition 8.
>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being 
>> > assigned changelog partition 9.
>> > 2018-03-16 21:05:55 logback 50838 
>> > [main-SendThread(10.0.127.114:2181)]
>> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply 
>> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null 
>> > serverPath:null finished:false header:: 23,4  replyHeader:: 23,14024,0  
>> > request::
>> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/
>> > JobModelGeneration/jobModelVersion,T  response::
>> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878}
>> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO 
>> > o.apache.samza.zk.ZkJobCoordinator - 
>> > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated
>> > new Job Model. Version = 1
>> > 2018-03-16 21:06:05 logback 60848 
>> > [main-SendThread(10.0.127.114:2181)]
>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>> sessionid:
>> > 0x1622c8b5fc01ac7 after 2ms
>> > 2018-03-16 21:06:15 logback 70856 
>> > [main-SendThread(10.0.127.114:2181)]
>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>> sessionid:
>> > 0x1622c8b5fc01ac7 after 1ms
>> > 2018-03-16 21:06:25 logback 80865 
>> > [main-SendThread(10.0.127.114:2181)]
>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>> sessionid:
>> > 0x1622c8b5fc01ac7 after 2ms ...
>> >
>> > The zk ping responses continue every 10 seconds, but no other 
>> > activity or messages occur.
>> > It looks like it gets as far as confirming the JobModel and 
>> > grouping the partitions, but nothing actually starts up.
>> >
>> > Any ideas?
>> > Thanks in advance!
>> > Thunder
>> >
>> >
>> > -----Original Message-----
>> > From: Thunder Stumpges [mailto:tstump...@ntent.com]
>> > Sent: Thursday, March 15, 2018 16:35
>> > To: Jagadish Venkatraman <jagadish1...@gmail.com>
>> > Cc: dev@samza.apache.org; t...@recursivedream.com; 
>> > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
>> > Subject: RE: Old style "low level" Tasks with alternative 
>> > deployment
>> > model(s)
>> >
>> > Thanks a lot for the info. I have something basically working at 
>> > this point! I have not integrated it with Docker nor Kubernetes 
>> > yet, but it does run from my local machine.
>> >
>> > I have determined that LocalApplicationRunner does NOT do config 
>> > rewriting. I had to write my own little “StandAloneApplicationRunner”
>> > that handles the “main” entrypoint. It does command parsing using 
>> > CommandLine, load config from ConfigFactory, and perform rewriting 
>> > before creating the new instance of LocalApplicationRunner. This is 
>> > all my StandAloneApplicationRunner contains:
>> >
>> >
>> > object StandAloneSamzaRunner extends App with LazyLogging {
>> >
>> >   // parse command line args just like JobRunner.
>> >   val cmdline = new ApplicationRunnerCommandLine
>> >   val options = cmdline.parser.parse(args: _*)
>> >   val config = cmdline.loadConfig(options)
>> >
>> >   val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
>> >   runner.runTask()
>> >   runner.waitForFinish()
>> > }
>> >
>> > The only config settings I needed to make to use this runner were 
>> > (easily configured due to our central Consul config system and our
>> rewriter) :
>> >
>> > # use the ZK based job coordinator
>> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
>> > # need to use GroupByContainerIds instead of GroupByContainerCount 
>> > task.name.grouper.factory=org.apache.samza.container.grouper.task.
>> > GroupByContainerIdsFactory
>> > # ZKJC config
>> > job.coordinator.zk.connect=<our_zk_connection>
>> >
>> > I did run into one potential problem; as you see above, I have 
>> > started the task using runTask() and then to prevent my main method 
>> > from returning, I have called waitForFinish(). The first time I ran 
>> > it, the job itself failed because I had forgotten to override the 
>> > task grouper, and container count was pulled from our staging environment.
>> > There are some failures logged and it appears the JobCoordinator 
>> > fails, but it never returns from waitForFinish. Stack trace and
>> continuation of log is below:
>> >
>> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR 
>> > o.a.s.zk.ScheduleAfterDebounceTime
>> > - Execution of action: OnProcessorChange failed.
>> > java.lang.IllegalArgumentException: Your container count (4) is 
>> > larger than your task count (2). Can't have containers with nothing 
>> > to do, so aborting.
>> >        at 
>> > org.apache.samza.container.grouper.task.GroupByContainerCoun
>> t.
>> > validateTasks(GroupByContainerCount.java:212)
>> >        at org.apache.samza.container.grouper.task.
>> > GroupByContainerCount.group(GroupByContainerCount.java:62)
>> >        at org.apache.samza.container.grouper.task.TaskNameGrouper.
>> > group(TaskNameGrouper.java:56)
>> >        at 
>> > org.apache.samza.coordinator.JobModelManager$.readJobModel(
>> > JobModelManager.scala:266)
>> >        at 
>> > org.apache.samza.coordinator.JobModelManager.readJobModel(
>> > JobModelManager.scala)
>> >        at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(
>> > ZkJobCoordinator.java:306)
>> >        at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(
>> > ZkJobCoordinator.java:197)
>> >        at 
>> > org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerIm
>> pl.
>> > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
>> >        at org.apache.samza.zk.ScheduleAfterDebounceTime.
>> > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134)
>> >        at java.util.concurrent.Executors$RunnableAdapter.
>> > call$$$capture(Executors.java:511)
>> >        at java.util.concurrent.Executors$RunnableAdapter.
>> > call(Executors.java)
>> >        at java.util.concurrent.FutureTask.run$$$capture(
>> > FutureTask.java:266)
>> >        at java.util.concurrent.FutureTask.run(FutureTask.java)
>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>> > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> >        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> > ThreadPoolExecutor.java:1142)
>> >        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> > ThreadPoolExecutor.java:617)
>> >        at java.lang.Thread.run(Thread.java:745)
>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG 
>> > o.a.samza.processor.StreamProcessor - Container is not instantiated
>> yet.
>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG 
>> > org.I0Itec.zkclient.ZkClient - Closing ZkClient...
>> > 2018-03-15 22:34:32 logback 77789
>> > [ZkClient-EventThread-15-10.0.127.114:2181]
>> > INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event
>> thread.
>> >
>> > And then the application continues on with metric reporters, and 
>> > other debug logging (not actually running the task though)
>> >
>> > Thanks in advance for the guidance, this has been easier than I
>> imagined!
>> > I’ll report back when I get more of the Dockerization/Kubernetes 
>> > running and test it a bit more.
>> > Cheers,
>> > Thunder
>> >
>> >
>> > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com]
>> > Sent: Thursday, March 15, 2018 14:46
>> > To: Thunder Stumpges <tstump...@ntent.com>
>> > Cc: dev@samza.apache.org; t...@recursivedream.com; 
>> > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
>> > Subject: Re: Old style "low level" Tasks with alternative 
>> > deployment
>> > model(s)
>> >
>> > >>  Thanks for the info on the tradeoffs. That makes a lot of 
>> > >> sense. I am
>> > on-board with using ZkJobCoordinator, sounds like some good 
>> > benefits over just the Kafka high-level consumer.
>> >
>> > This certainly looks like the simplest alternative.
>> >
>> > For your other questions, please find my answers inline.
>> >
>> > >> Q1: If I use LocalApplicationRunner, It does not use
>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>> >
>> > Your understanding is correct. It directly instantiates the 
>> > StreamProcessor, which in-turn creates and runs the SamzaContainer.
>> >
>> > >> Q2: If I use LocalApplicationRunner, I will need to code myself 
>> > >> the
>> > loading and rewriting of the Config that is currently handled by 
>> > JobRunner, correct?
>> >
>> > I don't think you'll need to do this. IIUC, the 
>> > LocalApplicationRunner should automatically invoke rewriters and do the 
>> > right thing.
>> >
>> > >>  Q3: Do I need to also handle coordinator stream(s) and storing 
>> > >> of
>> > config that is done in JobRunner (I don’t think so as the ?
>> >
>> > I don't think this is necessary either. The creation of coordinator 
>> > stream and persisting configuration happens in the 
>> > LocalApplicationRunner (more specifically in
>> StreamManager#createStreams).
>> >
>> > >> Q4: Where/How do I specify the Container ID for each instance? 
>> > >> Is there
>> > a config setting that I can pass, (or pull from an env variable and 
>> > add to the config) ? I am assuming it is my responsibility to 
>> > ensure that each instance is started with a unique container ID..?
>> >
>> > Nope, If you are using the ZkJobCoordinator, you need not have to 
>> > worry about assigning IDs for each instance. The framework will 
>> > automatically take care of generating IDs and reaching consensus by 
>> > electing a leader. If you are curious please take a look at 
>> > implementations of the ProcessorIdGenerator interface.
>> >
>> > Please let us know should you have further questions!
>> >
>> > Best,
>> > Jagdish
>> >
>> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges 
>> > <tstump...@ntent.com <mailto:tstump...@ntent.com>> wrote:
>> >
>> > Thanks for the info on the tradeoffs. That makes a lot of sense. I 
>> > am on-board with using ZkJobCoordinator, sounds like some good 
>> > benefits over just the Kafka high-level consumer.
>> >
>> >
>> >
>> > To that end, I have made some notes on possible approaches based on 
>> > the previous thread, and from my look into the code. I’d love to 
>> > get
>> feedback.
>> >
>> >
>> >
>> > Approach 1. Configure jobs to use “ProcessJobFactory” and run 
>> > instances of the job using run-job.sh or using JobRunner directly.
>> >
>> > I don’t think this makes sense from what I can see for a few reasons:
>> >
>> >   *   JobRunner is concerned with stuff I don't *think* we need:
>> >
>> >      *   coordinatorSystemProducer|Consumer,
>> >      *   writing/reading the configuration to the coordinator streams
>> >
>> >   *   ProcessJobFactory hard-codes the ID to “0” so I don’t think that
>> > will work for multiple instances.
>> >
>> >
>> >
>> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and 
>> > invoke
>> > LocalApplicationRunner.runTask()
>> >
>> >
>> >
>> >     Q1: If I use LocalApplicationRunner, It does not use 
>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>> >
>> >     Q2: If I use LocalApplicationRunner, I will need to code myself 
>> > the loading and rewriting of the Config that is currently handled 
>> > by JobRunner, correct?
>> >
>> >     Q3: Do I need to also handle coordinator stream(s) and storing 
>> > of config that is done in JobRunner (I don’t think so as the ?
>> >
>> >     Q4: Where/How do I specify the Container ID for each instance? 
>> > Is there a config setting that I can pass, (or pull from an env 
>> > variable and add to the config) ? I am assuming it is my 
>> > responsibility to ensure that each instance is started with a unique 
>> > container ID..?
>> >
>> > I am getting started on the above (Approach 2.), and looking closer 
>> > at the code so I may have my own answers to my questions, but 
>> > figured I should go ahead and ask now anyway. Thanks!
>> >
>> > -Thunder
>> >
>> >
>> > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com<mailto:
>> > jagadish1...@gmail.com>]
>> > Sent: Thursday, March 15, 2018 1:41
>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder 
>> > Stumpges < tstump...@ntent.com<mailto:tstump...@ntent.com>>;
>> > t...@recursivedream.com <mailto:t...@recursivedream.com>
>> > Cc: yi...@linkedin.com<mailto:yi...@linkedin.com>; Yi Pan < 
>> > nickpa...@gmail.com<mailto:nickpa...@gmail.com>>
>> >
>> > Subject: Re: Old style "low level" Tasks with alternative 
>> > deployment
>> > model(s)
>> >
>> > >> You are correct that this is focused on the higher-level API but 
>> > >> doesn't
>> > preclude using the lower-level API. I was at the same point you 
>> > were not long ago, in fact, and had a very productive conversation 
>> > on the list
>> >
>> > Thanks Tom for linking the thread, and I'm glad that you were able 
>> > to get Kubernetes integration working with Samza.
>> >
>> > >> If it is helpful for everyone, once I get the low-level API + 
>> > >> ZkJobCoordinator + Docker +
>> > K8s working, I'd be glad to formulate an additional sample for
>> hello-samza.
>> >
>> > @Thunder Stumpges:
>> > We'd be thrilled to receive your contribution. Examples, demos, 
>> > tutorials etc.
>> > contribute a great deal to improving the ease of use of Apache Samza.
>> > I'm happy to shepherd design discussions/code-reviews in the 
>> > open-source including answering any questions you may have.
>> >
>> >
>> > >> One thing I'm still curious about, is what are the drawbacks or 
>> > >> complexities of leveraging the Kafka High-level consumer + 
>> > >> PassthroughJobCoordinator in a stand-alone setup like this? We 
>> > >> do have Zookeeper (because of kafka) so I think either would 
>> > >> work. The Kafka High-level consumer comes with other nice tools 
>> > >> for monitoring offsets, lag, etc
>> >
>> >
>> > @Thunder Stumpges:
>> >
>> > Samza uses a "Job-Coordinator" to assign your input-partitions 
>> > among the different instances of your application s.t. they don't 
>> > overlap. A typical way to solve this "partition distribution"
>> > problem is to have a single instance elected as a "leader" and have 
>> > the leader assign partitions to the group.
>> > The ZkJobCoordinator uses Zk primitives to achieve this, while the 
>> > YarnJC relies on Yarn's guarantee that there will be a 
>> > singleton-AppMaster to achieve this.
>> >
>> > A key difference that separates the PassthroughJC from the Yarn/Zk 
>> > variants is that it does _not_ attempt to solve the "partition 
>> > distribution" problem. As a result, there's no leader-election involved.
>> > Instead, it pushes the problem of "partition distribution" to the 
>> > underlying consumer.
>> >
>> > The PassThroughJc supports these 2 scenarios:
>> >
>> > 1. Consumer-managed partition distribution: When using the Kafka 
>> > high-level consumer (or an AWS KinesisClientLibrary consumer) with 
>> > Samza, the consumer manages partitions internally.
>> >
>> > 2. Static partition distribution: Alternately, partitions can be 
>> > managed statically using configuration. You can achieve static 
>> > partition assignment by implementing a custom 
>> > SystemStreamPartitionGrouper<h 
>> > ttps://samza.apache.org/learn/documentation/0.8/api/
>> > javadocs/org/apache/samza/container/grouper/stream/
>> > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https:// 
>> > github.com/apache/samza/blob/master/samza-core/src/main/
>> > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>.
>> > Solutions in this category will typically require you to 
>> > distinguish the various processors in the group by providing an "id" for 
>> > each.
>> > Once the "id"s are decided, you can then statically compute 
>> > assignments using a function (eg: modulo N).
>> > You can rely on the following mechanisms to provide this id:
>> >  - Configure each instance differently to have its own id
>> >  - Obtain the id from the cluster-manager. For instance, Kubernetes 
>> > will provide each POD an unique id in the range [0,N). AWS ECS 
>> > should expose similar capabilities via a REST end-point.
>> >
>> > >> One thing I'm still curious about, is what are the drawbacks or
>> > complexities of leveraging the Kafka High-level consumer + 
>> > PassthroughJobCoordinator in a stand-alone setup like this?
>> >
>> > Leveraging the Kafka High-level consumer:
>> >
>> > The Kafka high-level consumer is not integrated into Samza just yet.
>> > Instead, Samza's integration with Kafka uses the low-level consumer 
>> > because
>> > i) It allows for greater control in fetching data from individual
>> brokers.
>> > It is simple and performant in-terms of the threading model to have 
>> > one-thread pull from each broker.
>> > ii) It is efficient in memory utilization since it does not do 
>> > internal-buffering of messages.
>> > iii) There's no overhead like Kafka-controller heart-beats that are 
>> > driven by consumer.poll
>> >
>> > Since there's no built-in integration, you will have to build a new 
>> > SystemConsumer if you need to integrate with the Kafka High-level
>> consumer.
>> > Further, there's more a fair bit of complexity to manage in
>> checkpointing.
>> >
>> > >> The Kafka High-level consumer comes with other nice tools for 
>> > >> monitoring offsets, lag, etc
>> >
>> > Samza exposes<https://github.com/apache/samza/blob/master/
>> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/
>> > KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring:
>> > - The current log-end offset for each partition
>> > - The last check-pointed offset for each partition
>> > - The number of messages behind the highwatermark of the partition
>> >
>> > Please let us know if you need help discovering these or 
>> > integrating these with other systems/tools.
>> >
>> >
>> > Leveraging the Passthrough JobCoordinator:
>> >
>> > It's helpful to split this discussion on tradeoffs with 
>> > PassthroughJC into
>> > 2 parts:
>> >
>> > 1. PassthroughJC + consumer managed partitions:
>> >
>> > - In this model, Samza has no control over partition-assignment 
>> > since it's managed by the consumer. This means that stateful 
>> > operations like joins that rely on partitions being co-located on 
>> > the same task will
>> not work.
>> > Simple stateless operations (eg: map, filter, remote lookups) are fine.
>> >
>> > - A key differentiator between Samza and other frameworks is our 
>> > support for "host 
>> > affinity<https://samza.apache.org/learn/documentation/0.14/
>> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning 
>> > partitions to hosts taking data-locality into account. If the 
>> > consumer can arbitrarily shuffle partitions, it'd be hard to 
>> > support this affinity/locality. Often this is a key optimization 
>> > when dealing with large stateful jobs.
>> >
>> > 2. PassthroughJC + static partitions:
>> >
>> > - In this model, it is possible to make stateful processing 
>> > (including host affinity) work by carefully choosing how "id"s are 
>> > assigned and computed.
>> >
>> > Recommendation:
>> >
>> > - Owing to the above subtleties, I would recommend that we give the 
>> > ZkJobCoordinator + the built-in low-level Kafka integration a try.
>> > - If we hit snags down this path, we can certainly explore the 
>> > approach with PassthroughJC + static partitions.
>> > - Using the PassthroughJC + consumer-managed distribution would be 
>> > least preferable owing to the subtleties I outlined above.
>> >
>> > Please let us know should you have more questions.
>> >
>> > Best,
>> > Jagdish
>> >
>> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges 
>> > <tstump...@ntent.com <mailto:tstump...@ntent.com>> wrote:
>> > Wow, what great timing, and what a great thread! I definitely have 
>> > some good starters to go off of here.
>> >
>> > If it is helpful for everyone, once I get the low-level API + 
>> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate 
>> > an additional sample for hello-samza.
>> >
>> > One thing I'm still curious about, is what are the drawbacks or 
>> > complexities of leveraging the Kafka High-level consumer + 
>> > PassthroughJobCoordinator in a stand-alone setup like this? We do 
>> > have Zookeeper (because of kafka) so I think either would work. The 
>> > Kafka High-level consumer comes with other nice tools for 
>> > monitoring offsets, lag, etc....
>> >
>> > Thanks guys!
>> > -Thunder
>> >
>> > -----Original Message-----
>> > From: Tom Davis [mailto:t...@recursivedream.com<mailto:
>> > t...@recursivedream.com>]
>> > Sent: Wednesday, March 14, 2018 17:50
>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>
>> > Subject: Re: Old style "low level" Tasks with alternative 
>> > deployment
>> > model(s)
>> >
>> > Hey there!
>> >
>> > You are correct that this is focused on the higher-level API but 
>> > doesn't preclude using the lower-level API. I was at the same point 
>> > you were not long ago, in fact, and had a very productive 
>> > conversation
>> on the list:
>> > you should look for "Question about custom StreamJob/Factory" in 
>> > the list archive for the past couple months.
>> >
>> > I'll quote Jagadish Venkatraman from that thread:
>> >
>> > > For the section on the low-level API, can you use 
>> > > LocalApplicationRunner#runTask()? It basically creates a new 
>> > > StreamProcessor and runs it. Remember to provide task.class and 
>> > > set it to your implementation of StreamTask or AsyncStreamTask. 
>> > > Please note that this is an evolving API and hence, subject to change.
>> >
>> > I ended up just switching to the high-level API because I don't 
>> > have any existing Tasks and the Kubernetes story is a little more 
>> > straight forward there (there's only one container/configuration to 
>> > deploy).
>> >
>> > Best,
>> >
>> > Tom
>> >
>> > Thunder Stumpges <tstump...@ntent.com<mailto:tstump...@ntent.com>>
>> writes:
>> >
>> > > Hi all,
>> > >
>> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing 
>> > > several processing pipelines. We have also begun a significant 
>> > > move of other services within our company to Docker/Kubernetes. 
>> > > Right now our Hadoop/Yarn cluster has a mix of stream and batch "Map 
>> > > Reduce"
>> > > jobs
>> > (many reporting and other batch processing jobs). We would really 
>> > like to move our stream processing off of Hadoop/Yarn and onto Kubernetes.
>> > >
>> > > When I just read about some of the new progress in .13 and .14 I 
>> > > got really excited! We would love to have our jobs run as simple 
>> > > libraries in our own JVM, and use the Kafka High-Level-Consumer 
>> > > for partition
>> > distribution and such. This would let us "dockerfy" our application 
>> > and run/scale in kubernetes.
>> > >
>> > > However as I read it, this new deployment model is ONLY for the
>> > > new(er) High Level API, correct? Is there a plan and/or resources 
>> > > for adapting this back to existing low-level tasks ? How 
>> > > complicated of a
>> > task is that? Do I have any other options to make this transition
>> easier?
>> > >
>> > > Thanks in advance.
>> > > Thunder
>> >
>> >
>> >
>> > --
>> > Jagadish V,
>> > Graduate Student,
>> > Department of Computer Science,
>> > Stanford University
>> >
>> >
>> >
>> > --
>> > Jagadish V,
>> > Graduate Student,
>> > Department of Computer Science,
>> > Stanford University
>> >
>>
>
>


--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to