Did you guys have a chance to review my "runner" class ? It's quite trivial, and I can port it to the samza codebase, give it a name of your choosing, and submit a PR if you like.
This is literally all of it: object StandAloneSamzaRunner extends App with LazyLogging { // parse command line args just like JobRunner. Main Options as emit by the parser: val cmdline = new ApplicationRunnerCommandLine val options = cmdline.parser.parse(args: _*) val config = cmdline.loadConfig(options) // run the task using LocalApplicationRunner (must be configured just like standard JobRunner) val runner = new LocalApplicationRunner(Util.rewriteConfig(config)) runner.runTask() runner.waitForFinish() } Thanks, Thunder -----Original Message----- From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] Sent: Tuesday, March 20, 2018 11:17 To: Prateek Maheshwari <prateek...@gmail.com> Cc: dev@samza.apache.org; t...@recursivedream.com; yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> Subject: Re: Old style "low level" Tasks with alternative deployment model(s) Hi Thunder, Thank you for the PR. Really nice work! Since, you have a working implementation on K8s, would you be willing to contribute a short tutorial / a post on this? We'll be sure to feature it in the official Samza web-site at http://samza.apache.org/. It'd be a great addition to the Samza community to have a section on K8s integration! There have been multiple prior asks on this, and your learnings would be super-helpful. Best, Jagdish On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari <prateek...@gmail.com> wrote: > Glad you were able to figure it out, that was very confusing. Thanks > for the fix too. > > - Prateek > > On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges > <tstump...@ntent.com> > wrote: > >> And that last issue was mine. My setting override was not picked up >> and it was using GroupByContainerCount instead. >> -Thanks, >> Thunder >> >> >> -----Original Message----- >> From: Thunder Stumpges >> Sent: Monday, March 19, 2018 20:58 >> To: dev@samza.apache.org >> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; >> t...@recursivedream.com; yi...@linkedin.com; Yi Pan >> <nickpa...@gmail.com> >> Subject: RE: Old style "low level" Tasks with alternative deployment >> model(s) >> >> Well I figured it out. My specific issue was due to a simple >> dependency problem where I had gotten an older version of the Jackson-mapper >> library. >> However the code was throwing NoSuchMethodError (an Error instead of >> Exception) and being silently dropped. I created a pull request to >> handle any Throwable in ScheduleAfterDebounceTime. >> https://github.com/apache/samza/pull/450 >> >> I'm now running into an issue with the generation of the JobModel and >> the ProcessorId. The ZkJobCoordinator has a ProcessorId that is a >> Guid, but when GroupByContainerIds class (my TaskNameGrouper) creates >> the ContainerModels, it is using the ContainerId (a numeric value, >> 0,1,2,etc) as the ProcessorId (~ line 105). This results in the >> JobModel that is generated and published immediately causing the >> processor to quit with this >> message: >> >> INFO o.apache.samza.zk.ZkJobCoordinator - New JobModel does not >> contain pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this processor. >> >> I was assuming I should be using GroupByContainerIds as my >> TaskNameGrouper. I don't see any other promising implementations. Am >> I just missing something? >> >> Thanks, >> Thunder >> >> JobModel >> { >> "config" : { >> ... >> }, >> "containers" : { >> "0" : { >> "tasks" : { >> "Partition 0" : { >> "task-name" : "Partition 0", >> "system-stream-partitions" : [ { >> "system" : "kafka", >> "partition" : 0, >> "stream" : "test_topic1" >> }, { >> "system" : "kafka", >> "partition" : 0, >> "stream" : "test_topic2" >> } ], >> "changelog-partition" : 0 >> }, >> "Partition 1" : { >> "task-name" : "Partition 1", >> "system-stream-partitions" : [ { >> "system" : "kafka", >> "partition" : 1, >> "stream" : "test_topic1" >> }, { >> "system" : "kafka", >> "partition" : 1, >> "stream" : "test_topic2" >> } ], >> "changelog-partition" : 1 >> } >> }, >> "container-id" : 0, >> "processor-id" : "0" >> } >> }, >> "max-change-log-stream-partitions" : 2, >> "all-container-locality" : { >> "0" : null >> } >> } >> >> -----Original Message----- >> From: Thunder Stumpges [mailto:tstump...@ntent.com] >> Sent: Friday, March 16, 2018 18:21 >> To: dev@samza.apache.org >> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; >> t...@recursivedream.com; yi...@linkedin.com; Yi Pan >> <nickpa...@gmail.com> >> Subject: RE: Old style "low level" Tasks with alternative deployment >> model(s) >> >> Attached. I don't see any threads actually running this code which is odd. >> >> There's my main thread that's waiting for the whole thing to finish, >> the "debounce-thread-0" (which logged the other surrounding messages >> below) has >> this: >> >> "debounce-thread-0" #18 daemon prio=5 os_prio=0 >> tid=0x00007fa0fd719800 >> nid=0x21 waiting on condition [0x00007fa0d0d45000] >> java.lang.Thread.State: WAITING (parking) >> at sun.misc.Unsafe.park(Native Method) >> - parking to wait for <0x00000006f166e350> (a >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >> at >> java.util.concurrent.locks.LockSupport.park(LockSupport.java >> :175) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit >> ionObject.await(AbstractQueuedSynchronizer.java:2039) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork >> Queue.take(ScheduledThreadPoolExecutor.java:1081) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork >> Queue.take(ScheduledThreadPoolExecutor.java:809) >> at >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx >> ecutor.java:1067) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1127) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Locked ownable synchronizers: >> - None >> >> Thanks for having a look. >> Thunder >> >> >> -----Original Message----- >> From: Prateek Maheshwari [mailto:prateek...@gmail.com] >> Sent: Friday, March 16, 2018 17:02 >> To: dev@samza.apache.org >> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; >> t...@recursivedream.com; yi...@linkedin.com; Yi Pan >> <nickpa...@gmail.com> >> Subject: Re: Old style "low level" Tasks with alternative deployment >> model(s) >> >> Hi Thunder, >> >> Can you please take and attach a thread dump with this? >> >> Thanks, >> Prateek >> >> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges >> <tstump...@ntent.com> >> wrote: >> >> > It appears it IS hung while serializing the JobModel... very strange! >> > I added some debug statements around the calls: >> > >> > LOG.debug("Getting object mapper to serialize job model"); >> > // this IS printed >> > ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); >> > LOG.debug("Serializing job model"); // this IS printed >> > String jobModelStr = mmapper.writerWithDefaultPrettyPrinter >> > ().writeValueAsString(jobModel); >> > LOG.info("jobModelAsString=" + jobModelStr); // this is NOT >> printed! >> > >> > Another thing I noticed is that "getObjectMapper" actually creates >> > the object mapper twice! >> > >> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG >> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize >> > job model >> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Creating new object mapper and simple module >> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding SerDes and mixins >> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding custom ContainerModel deserializer >> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Setting up naming strategy and registering module >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Done! >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Creating new object mapper and simple module >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding SerDes and mixins >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding custom ContainerModel deserializer >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Setting up naming strategy and registering module >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Done! >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > org.apache.samza.zk.ZkUtils - Serializing job model >> > >> > Could this ObjectMapper be a singleton? I see there is a private >> > static instance, but getObjectMapper creates a new one every time... >> > >> > Anyway, then it takes off to serialize the job model and never >> > comes back... >> > >> > Hoping someone has some idea here... the implementation for this >> > mostly comes from Jackson-mapper-asl, and I have the version that >> > is linked in the >> > 0.14.0 tag: >> > | | | +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 >> > | | | | \--- org.codehaus.jackson:jackson-core-asl:1.9.13 >> > >> > Thanks! >> > Thunder >> > >> > -----Original Message----- >> > From: Thunder Stumpges [mailto:tstump...@ntent.com] >> > Sent: Friday, March 16, 2018 15:29 >> > To: dev@samza.apache.org; Jagadish Venkatraman >> > <jagadish1...@gmail.com> >> > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < >> > nickpa...@gmail.com> >> > Subject: RE: Old style "low level" Tasks with alternative >> > deployment >> > model(s) >> > >> > So, my investigation starts at StreamProcessor.java. Line 294 in >> > method >> > onNewJobModel() logs an INFO message that it's starting a container. >> > This message never appears. >> > >> > I see that ZkJobCoordinator calls onNewJobModel from its >> > onNewJobModelConfirmed method which also logs an info message >> > stating "version X of the job model got confirmed". I never see >> > this message either, so I go up the chain some more. >> > >> > I DO see: >> > >> > 2018-03-16 21:43:58 logback 20498 >> > [ZkClient-EventThread-13-10.0.127.114:2181] >> > INFO o.apache.samza.zk.ZkJobCoordinator - >> > ZkJobCoordinator::onBecomeLeader >> > - I became the leader! >> > And >> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO >> > o.apache.samza.zk.ZkJobCoordinator - >> > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated >> > new Job Model. Version = 1 >> > >> > Which led me to method onDoProcessorChange line 210. I see that >> > line, but not the line below " Published new Job Model. Version =" >> > so something in here is not completing: >> > >> > LOG.info("pid=" + processorId + "Generated new Job Model. >> > Version = >> " >> > + nextJMVersion); >> > >> > // Publish the new job model >> > zkUtils.publishJobModel(nextJMVersion, jobModel); >> > >> > // Start the barrier for the job model update >> > barrier.create(nextJMVersion, currentProcessorIds); >> > >> > // Notify all processors about the new JobModel by updating >> > JobModel Version number >> > zkUtils.publishJobModelVersion(currentJMVersion, >> > nextJMVersion); >> > >> > LOG.info("pid=" + processorId + "Published new Job Model. >> > Version = >> " >> > + nextJMVersion); >> > >> > As I mentioned, after the line "Generated new Job Model. Version = 1" >> > I just get repeated zk ping responses.. no more application logging. >> > >> > The very next thing that's run is zkUtils.publishJobModel() which >> > only has two lines before another log statement (which I don't see): >> > >> > public void publishJobModel(String jobModelVersion, JobModel >> jobModel) { >> > try { >> > ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); >> > String jobModelStr = mmapper.writerWithDefaultPrettyPrinter >> > ().writeValueAsString(jobModel); >> > LOG.info("jobModelAsString=" + jobModelStr); >> > ... >> > >> > Could it really be getting hung up on one of these two lines? >> > (seems like it must be, but I don't see anything there that seems >> > like it would just hang). I'll keep troubleshooting, maybe add some >> > more debug logging and try again. >> > >> > Thanks for any guidance you all might have. >> > -Thunder >> > >> > >> > -----Original Message----- >> > From: Thunder Stumpges [mailto:tstump...@ntent.com] >> > Sent: Friday, March 16, 2018 14:43 >> > To: dev@samza.apache.org; Jagadish Venkatraman >> > <jagadish1...@gmail.com> >> > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < >> > nickpa...@gmail.com> >> > Subject: RE: Old style "low level" Tasks with alternative >> > deployment >> > model(s) >> > >> > Well I have my stand-alone application in docker and running in >> > kubernetes. I think something isn't wired up all the way though, >> > because my task never actually gets invoked. I see no errors, >> > however I'm not getting the usual startup logs (checking existing >> > offsets, "entering run loop"...) My logs look like this: >> > >> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Verifying properties >> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Property client.id is overridden to >> > samza_admin-test_stream_task-1 >> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Property metadata.broker.list is overridden to >> > test-kafka-kafka.test-svc:9092 >> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Property request.timeout.ms is overridden to 30000 >> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO >> > kafka.client.ClientUtils$ - Fetching metadata from broker >> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation >> > id 0 for 1 topic(s) Set(dev_k8s.samza.test.topic) >> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG >> > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = >> > 30000 (requested 30000), SO_RCVBUF = 179680 (requested -1), >> > SO_SNDBUF = >> > 102400 (requested 102400), connectTimeoutMs = 30000. >> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO >> > kafka.producer.SyncProducer - Connected to >> > test-kafka-kafka.test-svc:9092 for producing >> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO >> > kafka.producer.SyncProducer - Disconnecting from >> > test-kafka-kafka.test-svc:9092 >> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG >> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1 >> > topic(s) >> > Set(dev_k8s.samza.test.topic) >> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper >> > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158cc >> > has grouped the SystemStreamPartitions into 10 tasks with the >> > following >> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, >> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9, >> > Partition 8] >> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being >> > assigned changelog partition 0. >> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being >> > assigned changelog partition 1. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being >> > assigned changelog partition 2. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being >> > assigned changelog partition 3. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being >> > assigned changelog partition 4. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being >> > assigned changelog partition 5. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being >> > assigned changelog partition 6. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being >> > assigned changelog partition 7. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being >> > assigned changelog partition 8. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being >> > assigned changelog partition 9. >> > 2018-03-16 21:05:55 logback 50838 >> > [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply >> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null >> > serverPath:null finished:false header:: 23,4 replyHeader:: 23,14024,0 >> > request:: >> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/ >> > JobModelGeneration/jobModelVersion,T response:: >> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878} >> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO >> > o.apache.samza.zk.ZkJobCoordinator - >> > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated >> > new Job Model. Version = 1 >> > 2018-03-16 21:06:05 logback 60848 >> > [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >> sessionid: >> > 0x1622c8b5fc01ac7 after 2ms >> > 2018-03-16 21:06:15 logback 70856 >> > [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >> sessionid: >> > 0x1622c8b5fc01ac7 after 1ms >> > 2018-03-16 21:06:25 logback 80865 >> > [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >> sessionid: >> > 0x1622c8b5fc01ac7 after 2ms ... >> > >> > The zk ping responses continue every 10 seconds, but no other >> > activity or messages occur. >> > It looks like it gets as far as confirming the JobModel and >> > grouping the partitions, but nothing actually starts up. >> > >> > Any ideas? >> > Thanks in advance! >> > Thunder >> > >> > >> > -----Original Message----- >> > From: Thunder Stumpges [mailto:tstump...@ntent.com] >> > Sent: Thursday, March 15, 2018 16:35 >> > To: Jagadish Venkatraman <jagadish1...@gmail.com> >> > Cc: dev@samza.apache.org; t...@recursivedream.com; >> > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> >> > Subject: RE: Old style "low level" Tasks with alternative >> > deployment >> > model(s) >> > >> > Thanks a lot for the info. I have something basically working at >> > this point! I have not integrated it with Docker nor Kubernetes >> > yet, but it does run from my local machine. >> > >> > I have determined that LocalApplicationRunner does NOT do config >> > rewriting. I had to write my own little “StandAloneApplicationRunner” >> > that handles the “main” entrypoint. It does command parsing using >> > CommandLine, load config from ConfigFactory, and perform rewriting >> > before creating the new instance of LocalApplicationRunner. This is >> > all my StandAloneApplicationRunner contains: >> > >> > >> > object StandAloneSamzaRunner extends App with LazyLogging { >> > >> > // parse command line args just like JobRunner. >> > val cmdline = new ApplicationRunnerCommandLine >> > val options = cmdline.parser.parse(args: _*) >> > val config = cmdline.loadConfig(options) >> > >> > val runner = new LocalApplicationRunner(Util.rewriteConfig(config)) >> > runner.runTask() >> > runner.waitForFinish() >> > } >> > >> > The only config settings I needed to make to use this runner were >> > (easily configured due to our central Consul config system and our >> rewriter) : >> > >> > # use the ZK based job coordinator >> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory >> > # need to use GroupByContainerIds instead of GroupByContainerCount >> > task.name.grouper.factory=org.apache.samza.container.grouper.task. >> > GroupByContainerIdsFactory >> > # ZKJC config >> > job.coordinator.zk.connect=<our_zk_connection> >> > >> > I did run into one potential problem; as you see above, I have >> > started the task using runTask() and then to prevent my main method >> > from returning, I have called waitForFinish(). The first time I ran >> > it, the job itself failed because I had forgotten to override the >> > task grouper, and container count was pulled from our staging environment. >> > There are some failures logged and it appears the JobCoordinator >> > fails, but it never returns from waitForFinish. Stack trace and >> continuation of log is below: >> > >> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR >> > o.a.s.zk.ScheduleAfterDebounceTime >> > - Execution of action: OnProcessorChange failed. >> > java.lang.IllegalArgumentException: Your container count (4) is >> > larger than your task count (2). Can't have containers with nothing >> > to do, so aborting. >> > at >> > org.apache.samza.container.grouper.task.GroupByContainerCoun >> t. >> > validateTasks(GroupByContainerCount.java:212) >> > at org.apache.samza.container.grouper.task. >> > GroupByContainerCount.group(GroupByContainerCount.java:62) >> > at org.apache.samza.container.grouper.task.TaskNameGrouper. >> > group(TaskNameGrouper.java:56) >> > at >> > org.apache.samza.coordinator.JobModelManager$.readJobModel( >> > JobModelManager.scala:266) >> > at >> > org.apache.samza.coordinator.JobModelManager.readJobModel( >> > JobModelManager.scala) >> > at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel( >> > ZkJobCoordinator.java:306) >> > at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange( >> > ZkJobCoordinator.java:197) >> > at >> > org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerIm >> pl. >> > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318) >> > at org.apache.samza.zk.ScheduleAfterDebounceTime. >> > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134) >> > at java.util.concurrent.Executors$RunnableAdapter. >> > call$$$capture(Executors.java:511) >> > at java.util.concurrent.Executors$RunnableAdapter. >> > call(Executors.java) >> > at java.util.concurrent.FutureTask.run$$$capture( >> > FutureTask.java:266) >> > at java.util.concurrent.FutureTask.run(FutureTask.java) >> > at java.util.concurrent.ScheduledThreadPoolExecutor$ >> > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> > at java.util.concurrent.ScheduledThreadPoolExecutor$ >> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> > at java.util.concurrent.ThreadPoolExecutor.runWorker( >> > ThreadPoolExecutor.java:1142) >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> > ThreadPoolExecutor.java:617) >> > at java.lang.Thread.run(Thread.java:745) >> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG >> > o.a.samza.processor.StreamProcessor - Container is not instantiated >> yet. >> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG >> > org.I0Itec.zkclient.ZkClient - Closing ZkClient... >> > 2018-03-15 22:34:32 logback 77789 >> > [ZkClient-EventThread-15-10.0.127.114:2181] >> > INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event >> thread. >> > >> > And then the application continues on with metric reporters, and >> > other debug logging (not actually running the task though) >> > >> > Thanks in advance for the guidance, this has been easier than I >> imagined! >> > I’ll report back when I get more of the Dockerization/Kubernetes >> > running and test it a bit more. >> > Cheers, >> > Thunder >> > >> > >> > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] >> > Sent: Thursday, March 15, 2018 14:46 >> > To: Thunder Stumpges <tstump...@ntent.com> >> > Cc: dev@samza.apache.org; t...@recursivedream.com; >> > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> >> > Subject: Re: Old style "low level" Tasks with alternative >> > deployment >> > model(s) >> > >> > >> Thanks for the info on the tradeoffs. That makes a lot of >> > >> sense. I am >> > on-board with using ZkJobCoordinator, sounds like some good >> > benefits over just the Kafka high-level consumer. >> > >> > This certainly looks like the simplest alternative. >> > >> > For your other questions, please find my answers inline. >> > >> > >> Q1: If I use LocalApplicationRunner, It does not use >> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? >> > >> > Your understanding is correct. It directly instantiates the >> > StreamProcessor, which in-turn creates and runs the SamzaContainer. >> > >> > >> Q2: If I use LocalApplicationRunner, I will need to code myself >> > >> the >> > loading and rewriting of the Config that is currently handled by >> > JobRunner, correct? >> > >> > I don't think you'll need to do this. IIUC, the >> > LocalApplicationRunner should automatically invoke rewriters and do the >> > right thing. >> > >> > >> Q3: Do I need to also handle coordinator stream(s) and storing >> > >> of >> > config that is done in JobRunner (I don’t think so as the ? >> > >> > I don't think this is necessary either. The creation of coordinator >> > stream and persisting configuration happens in the >> > LocalApplicationRunner (more specifically in >> StreamManager#createStreams). >> > >> > >> Q4: Where/How do I specify the Container ID for each instance? >> > >> Is there >> > a config setting that I can pass, (or pull from an env variable and >> > add to the config) ? I am assuming it is my responsibility to >> > ensure that each instance is started with a unique container ID..? >> > >> > Nope, If you are using the ZkJobCoordinator, you need not have to >> > worry about assigning IDs for each instance. The framework will >> > automatically take care of generating IDs and reaching consensus by >> > electing a leader. If you are curious please take a look at >> > implementations of the ProcessorIdGenerator interface. >> > >> > Please let us know should you have further questions! >> > >> > Best, >> > Jagdish >> > >> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges >> > <tstump...@ntent.com <mailto:tstump...@ntent.com>> wrote: >> > >> > Thanks for the info on the tradeoffs. That makes a lot of sense. I >> > am on-board with using ZkJobCoordinator, sounds like some good >> > benefits over just the Kafka high-level consumer. >> > >> > >> > >> > To that end, I have made some notes on possible approaches based on >> > the previous thread, and from my look into the code. I’d love to >> > get >> feedback. >> > >> > >> > >> > Approach 1. Configure jobs to use “ProcessJobFactory” and run >> > instances of the job using run-job.sh or using JobRunner directly. >> > >> > I don’t think this makes sense from what I can see for a few reasons: >> > >> > * JobRunner is concerned with stuff I don't *think* we need: >> > >> > * coordinatorSystemProducer|Consumer, >> > * writing/reading the configuration to the coordinator streams >> > >> > * ProcessJobFactory hard-codes the ID to “0” so I don’t think that >> > will work for multiple instances. >> > >> > >> > >> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and >> > invoke >> > LocalApplicationRunner.runTask() >> > >> > >> > >> > Q1: If I use LocalApplicationRunner, It does not use >> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? >> > >> > Q2: If I use LocalApplicationRunner, I will need to code myself >> > the loading and rewriting of the Config that is currently handled >> > by JobRunner, correct? >> > >> > Q3: Do I need to also handle coordinator stream(s) and storing >> > of config that is done in JobRunner (I don’t think so as the ? >> > >> > Q4: Where/How do I specify the Container ID for each instance? >> > Is there a config setting that I can pass, (or pull from an env >> > variable and add to the config) ? I am assuming it is my >> > responsibility to ensure that each instance is started with a unique >> > container ID..? >> > >> > I am getting started on the above (Approach 2.), and looking closer >> > at the code so I may have my own answers to my questions, but >> > figured I should go ahead and ask now anyway. Thanks! >> > >> > -Thunder >> > >> > >> > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com<mailto: >> > jagadish1...@gmail.com>] >> > Sent: Thursday, March 15, 2018 1:41 >> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder >> > Stumpges < tstump...@ntent.com<mailto:tstump...@ntent.com>>; >> > t...@recursivedream.com <mailto:t...@recursivedream.com> >> > Cc: yi...@linkedin.com<mailto:yi...@linkedin.com>; Yi Pan < >> > nickpa...@gmail.com<mailto:nickpa...@gmail.com>> >> > >> > Subject: Re: Old style "low level" Tasks with alternative >> > deployment >> > model(s) >> > >> > >> You are correct that this is focused on the higher-level API but >> > >> doesn't >> > preclude using the lower-level API. I was at the same point you >> > were not long ago, in fact, and had a very productive conversation >> > on the list >> > >> > Thanks Tom for linking the thread, and I'm glad that you were able >> > to get Kubernetes integration working with Samza. >> > >> > >> If it is helpful for everyone, once I get the low-level API + >> > >> ZkJobCoordinator + Docker + >> > K8s working, I'd be glad to formulate an additional sample for >> hello-samza. >> > >> > @Thunder Stumpges: >> > We'd be thrilled to receive your contribution. Examples, demos, >> > tutorials etc. >> > contribute a great deal to improving the ease of use of Apache Samza. >> > I'm happy to shepherd design discussions/code-reviews in the >> > open-source including answering any questions you may have. >> > >> > >> > >> One thing I'm still curious about, is what are the drawbacks or >> > >> complexities of leveraging the Kafka High-level consumer + >> > >> PassthroughJobCoordinator in a stand-alone setup like this? We >> > >> do have Zookeeper (because of kafka) so I think either would >> > >> work. The Kafka High-level consumer comes with other nice tools >> > >> for monitoring offsets, lag, etc >> > >> > >> > @Thunder Stumpges: >> > >> > Samza uses a "Job-Coordinator" to assign your input-partitions >> > among the different instances of your application s.t. they don't >> > overlap. A typical way to solve this "partition distribution" >> > problem is to have a single instance elected as a "leader" and have >> > the leader assign partitions to the group. >> > The ZkJobCoordinator uses Zk primitives to achieve this, while the >> > YarnJC relies on Yarn's guarantee that there will be a >> > singleton-AppMaster to achieve this. >> > >> > A key difference that separates the PassthroughJC from the Yarn/Zk >> > variants is that it does _not_ attempt to solve the "partition >> > distribution" problem. As a result, there's no leader-election involved. >> > Instead, it pushes the problem of "partition distribution" to the >> > underlying consumer. >> > >> > The PassThroughJc supports these 2 scenarios: >> > >> > 1. Consumer-managed partition distribution: When using the Kafka >> > high-level consumer (or an AWS KinesisClientLibrary consumer) with >> > Samza, the consumer manages partitions internally. >> > >> > 2. Static partition distribution: Alternately, partitions can be >> > managed statically using configuration. You can achieve static >> > partition assignment by implementing a custom >> > SystemStreamPartitionGrouper<h >> > ttps://samza.apache.org/learn/documentation/0.8/api/ >> > javadocs/org/apache/samza/container/grouper/stream/ >> > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https:// >> > github.com/apache/samza/blob/master/samza-core/src/main/ >> > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>. >> > Solutions in this category will typically require you to >> > distinguish the various processors in the group by providing an "id" for >> > each. >> > Once the "id"s are decided, you can then statically compute >> > assignments using a function (eg: modulo N). >> > You can rely on the following mechanisms to provide this id: >> > - Configure each instance differently to have its own id >> > - Obtain the id from the cluster-manager. For instance, Kubernetes >> > will provide each POD an unique id in the range [0,N). AWS ECS >> > should expose similar capabilities via a REST end-point. >> > >> > >> One thing I'm still curious about, is what are the drawbacks or >> > complexities of leveraging the Kafka High-level consumer + >> > PassthroughJobCoordinator in a stand-alone setup like this? >> > >> > Leveraging the Kafka High-level consumer: >> > >> > The Kafka high-level consumer is not integrated into Samza just yet. >> > Instead, Samza's integration with Kafka uses the low-level consumer >> > because >> > i) It allows for greater control in fetching data from individual >> brokers. >> > It is simple and performant in-terms of the threading model to have >> > one-thread pull from each broker. >> > ii) It is efficient in memory utilization since it does not do >> > internal-buffering of messages. >> > iii) There's no overhead like Kafka-controller heart-beats that are >> > driven by consumer.poll >> > >> > Since there's no built-in integration, you will have to build a new >> > SystemConsumer if you need to integrate with the Kafka High-level >> consumer. >> > Further, there's more a fair bit of complexity to manage in >> checkpointing. >> > >> > >> The Kafka High-level consumer comes with other nice tools for >> > >> monitoring offsets, lag, etc >> > >> > Samza exposes<https://github.com/apache/samza/blob/master/ >> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/ >> > KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring: >> > - The current log-end offset for each partition >> > - The last check-pointed offset for each partition >> > - The number of messages behind the highwatermark of the partition >> > >> > Please let us know if you need help discovering these or >> > integrating these with other systems/tools. >> > >> > >> > Leveraging the Passthrough JobCoordinator: >> > >> > It's helpful to split this discussion on tradeoffs with >> > PassthroughJC into >> > 2 parts: >> > >> > 1. PassthroughJC + consumer managed partitions: >> > >> > - In this model, Samza has no control over partition-assignment >> > since it's managed by the consumer. This means that stateful >> > operations like joins that rely on partitions being co-located on >> > the same task will >> not work. >> > Simple stateless operations (eg: map, filter, remote lookups) are fine. >> > >> > - A key differentiator between Samza and other frameworks is our >> > support for "host >> > affinity<https://samza.apache.org/learn/documentation/0.14/ >> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning >> > partitions to hosts taking data-locality into account. If the >> > consumer can arbitrarily shuffle partitions, it'd be hard to >> > support this affinity/locality. Often this is a key optimization >> > when dealing with large stateful jobs. >> > >> > 2. PassthroughJC + static partitions: >> > >> > - In this model, it is possible to make stateful processing >> > (including host affinity) work by carefully choosing how "id"s are >> > assigned and computed. >> > >> > Recommendation: >> > >> > - Owing to the above subtleties, I would recommend that we give the >> > ZkJobCoordinator + the built-in low-level Kafka integration a try. >> > - If we hit snags down this path, we can certainly explore the >> > approach with PassthroughJC + static partitions. >> > - Using the PassthroughJC + consumer-managed distribution would be >> > least preferable owing to the subtleties I outlined above. >> > >> > Please let us know should you have more questions. >> > >> > Best, >> > Jagdish >> > >> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges >> > <tstump...@ntent.com <mailto:tstump...@ntent.com>> wrote: >> > Wow, what great timing, and what a great thread! I definitely have >> > some good starters to go off of here. >> > >> > If it is helpful for everyone, once I get the low-level API + >> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate >> > an additional sample for hello-samza. >> > >> > One thing I'm still curious about, is what are the drawbacks or >> > complexities of leveraging the Kafka High-level consumer + >> > PassthroughJobCoordinator in a stand-alone setup like this? We do >> > have Zookeeper (because of kafka) so I think either would work. The >> > Kafka High-level consumer comes with other nice tools for >> > monitoring offsets, lag, etc.... >> > >> > Thanks guys! >> > -Thunder >> > >> > -----Original Message----- >> > From: Tom Davis [mailto:t...@recursivedream.com<mailto: >> > t...@recursivedream.com>] >> > Sent: Wednesday, March 14, 2018 17:50 >> > To: dev@samza.apache.org<mailto:dev@samza.apache.org> >> > Subject: Re: Old style "low level" Tasks with alternative >> > deployment >> > model(s) >> > >> > Hey there! >> > >> > You are correct that this is focused on the higher-level API but >> > doesn't preclude using the lower-level API. I was at the same point >> > you were not long ago, in fact, and had a very productive >> > conversation >> on the list: >> > you should look for "Question about custom StreamJob/Factory" in >> > the list archive for the past couple months. >> > >> > I'll quote Jagadish Venkatraman from that thread: >> > >> > > For the section on the low-level API, can you use >> > > LocalApplicationRunner#runTask()? It basically creates a new >> > > StreamProcessor and runs it. Remember to provide task.class and >> > > set it to your implementation of StreamTask or AsyncStreamTask. >> > > Please note that this is an evolving API and hence, subject to change. >> > >> > I ended up just switching to the high-level API because I don't >> > have any existing Tasks and the Kubernetes story is a little more >> > straight forward there (there's only one container/configuration to >> > deploy). >> > >> > Best, >> > >> > Tom >> > >> > Thunder Stumpges <tstump...@ntent.com<mailto:tstump...@ntent.com>> >> writes: >> > >> > > Hi all, >> > > >> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing >> > > several processing pipelines. We have also begun a significant >> > > move of other services within our company to Docker/Kubernetes. >> > > Right now our Hadoop/Yarn cluster has a mix of stream and batch "Map >> > > Reduce" >> > > jobs >> > (many reporting and other batch processing jobs). We would really >> > like to move our stream processing off of Hadoop/Yarn and onto Kubernetes. >> > > >> > > When I just read about some of the new progress in .13 and .14 I >> > > got really excited! We would love to have our jobs run as simple >> > > libraries in our own JVM, and use the Kafka High-Level-Consumer >> > > for partition >> > distribution and such. This would let us "dockerfy" our application >> > and run/scale in kubernetes. >> > > >> > > However as I read it, this new deployment model is ONLY for the >> > > new(er) High Level API, correct? Is there a plan and/or resources >> > > for adapting this back to existing low-level tasks ? How >> > > complicated of a >> > task is that? Do I have any other options to make this transition >> easier? >> > > >> > > Thanks in advance. >> > > Thunder >> > >> > >> > >> > -- >> > Jagadish V, >> > Graduate Student, >> > Department of Computer Science, >> > Stanford University >> > >> > >> > >> > -- >> > Jagadish V, >> > Graduate Student, >> > Department of Computer Science, >> > Stanford University >> > >> > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University