Sounds good to me. I'll reply when I have a PR up and hand it off to you!

Thunder Stumpges <tstump...@ntent.com> writes:

I'm out this weekend, but would be glad to collaborate with Tom to get some documentation together. I'll let him take the lead on it, and then maybe I can contribute my information on "running low-level api jobs in standalone mode" and tips on kubernetes (which is really just like any other java application deployment once you get the job running stand-alone)

Thanks everyone, I have my job(s) running successfully in K8s at this time!

-Thunder

-----Original Message-----
From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com]
Sent: Tuesday, March 20, 2018 11:36
To: Tom Davis <t...@recursivedream.com>
Cc: Prateek Maheshwari <prateek...@gmail.com>; dev@samza.apache.org; 
yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
Subject: Re: Old style "low level" Tasks with alternative deployment model(s)

Hi Tom,

Happy to put something together this weekend as well.

Great, can't wait!!

What format would that be best in?

You can open a PR in markdown format.

Here's an example PR for Kinesis:
https://github.com/apache/samza/pull/384/files/
Here's how it looks and renders in our web-page:
https://samza.apache.org/learn/documentation/0.14/aws/kinesis.html

Best,
Jagdish

On Tue, Mar 20, 2018 at 11:24 AM, Tom Davis <t...@recursivedream.com> wrote:

What format would that be best in? Happy to put something together
this weekend as well.

Jagadish Venkatraman <jagadish1...@gmail.com> writes:

Hi Thunder,

Thank you for the PR. Really nice work!

Since, you have a working implementation on K8s, would you be willing
to contribute a short tutorial / a post on this? We'll be sure to
feature it in the official Samza web-site at http://samza.apache.org/.

It'd be a great addition to the Samza community to have a section on
K8s integration! There have been multiple prior asks on this, and
your learnings would be super-helpful.

Best,
Jagdish



On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari <
prateek...@gmail.com>
wrote:

Glad you were able to figure it out, that was very confusing. Thanks
for
the fix too.

- Prateek

On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges
<tstump...@ntent.com>
wrote:

And that last issue was mine. My setting override was not picked up
and
it was using GroupByContainerCount instead.
-Thanks,
Thunder


-----Original Message-----
From: Thunder Stumpges
Sent: Monday, March 19, 2018 20:58
To: dev@samza.apache.org
Cc: Jagadish Venkatraman <jagadish1...@gmail.com>;
t...@recursivedream.com; yi...@linkedin.com; Yi Pan
<nickpa...@gmail.com>
Subject: RE: Old style "low level" Tasks with alternative
deployment
model(s)

Well I figured it out. My specific issue was due to a simple
dependency problem where I had gotten an older version of the
Jackson-mapper library.
However the code was throwing NoSuchMethodError (an Error instead
of
Exception) and being silently dropped. I created a pull request to
handle any Throwable in ScheduleAfterDebounceTime.
https://github.com/apache/samza/pull/450

I'm now running into an issue with the generation of the JobModel
and the ProcessorId. The ZkJobCoordinator has a ProcessorId that is
a Guid, but when GroupByContainerIds class (my TaskNameGrouper)
creates the ContainerModels, it is using the ContainerId (a numeric
value,
0,1,2,etc)
as the ProcessorId (~ line 105). This results in the JobModel that
is generated and published immediately causing the processor to
quit with this
message:

INFO  o.apache.samza.zk.ZkJobCoordinator - New JobModel does not
contain pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this
processor.

I was assuming I should be using GroupByContainerIds as my
TaskNameGrouper. I don't see any other promising implementations.
Am I just missing something?

Thanks,
Thunder

JobModel
{
  "config" : {
  ...
  },
  "containers" : {
    "0" : {
      "tasks" : {
        "Partition 0" : {
          "task-name" : "Partition 0",
          "system-stream-partitions" : [ {
            "system" : "kafka",
            "partition" : 0,
            "stream" : "test_topic1"
          }, {
            "system" : "kafka",
            "partition" : 0,
            "stream" : "test_topic2"
          } ],
          "changelog-partition" : 0
        },
        "Partition 1" : {
          "task-name" : "Partition 1",
          "system-stream-partitions" : [ {
            "system" : "kafka",
            "partition" : 1,
            "stream" : "test_topic1"
          }, {
            "system" : "kafka",
            "partition" : 1,
            "stream" : "test_topic2"
          } ],
          "changelog-partition" : 1
        }
      },
      "container-id" : 0,
      "processor-id" : "0"
    }
  },
  "max-change-log-stream-partitions" : 2,
  "all-container-locality" : {
    "0" : null
  }
}

-----Original Message-----
From: Thunder Stumpges [mailto:tstump...@ntent.com]
Sent: Friday, March 16, 2018 18:21
To: dev@samza.apache.org
Cc: Jagadish Venkatraman <jagadish1...@gmail.com>;
t...@recursivedream.com; yi...@linkedin.com; Yi Pan
<nickpa...@gmail.com>
Subject: RE: Old style "low level" Tasks with alternative
deployment
model(s)

Attached. I don't see any threads actually running this code which
is odd.

There's my main thread that's waiting for the whole thing to
finish, the "debounce-thread-0" (which logged the other surrounding
messages below) has
this:

"debounce-thread-0" #18 daemon prio=5 os_prio=0
tid=0x00007fa0fd719800
nid=0x21 waiting on condition [0x00007fa0d0d45000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006f166e350> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at
java.util.concurrent.locks.LockSupport.park(LockSupport.java
:175)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit
ionObject.await(AbstractQueuedSynchronizer.java:2039)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
Queue.take(ScheduledThreadPoolExecutor.java:1081)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
Queue.take(ScheduledThreadPoolExecutor.java:809)
        at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx
ecutor.java:1067)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1127)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None

Thanks for having a look.
Thunder


-----Original Message-----
From: Prateek Maheshwari [mailto:prateek...@gmail.com]
Sent: Friday, March 16, 2018 17:02
To: dev@samza.apache.org
Cc: Jagadish Venkatraman <jagadish1...@gmail.com>;
t...@recursivedream.com; yi...@linkedin.com; Yi Pan
<nickpa...@gmail.com>
Subject: Re: Old style "low level" Tasks with alternative
deployment
model(s)

Hi Thunder,

Can you please take and attach a thread dump with this?

Thanks,
Prateek

On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges
<tstump...@ntent.com>
wrote:

> It appears it IS hung while serializing the JobModel... very strange!
> I added some debug statements around the calls:
>
>       LOG.debug("Getting object mapper to serialize job model");
> // this IS printed
>       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>       LOG.debug("Serializing job model"); // this IS printed
>       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
> ().writeValueAsString(jobModel);
>       LOG.info("jobModelAsString=" + jobModelStr); // this is NOT
printed!
>
> Another thing I noticed is that "getObjectMapper" actually
> creates the object mapper twice!
>
> 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG
> org.apache.samza.zk.ZkUtils - Getting object mapper to serialize
> job model
> 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Creating new object mapper and simple module
> 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding SerDes and mixins
> 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding custom ContainerModel deserializer
> 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Setting up naming strategy and registering module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Done!
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Creating new object mapper and simple module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding SerDes  and mixins
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding custom ContainerModel deserializer
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Setting up naming strategy and registering module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Done!
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> org.apache.samza.zk.ZkUtils - Serializing job model
>
> Could this ObjectMapper be a singleton? I see there is a private
> static instance, but getObjectMapper creates a new one every time...
>
> Anyway, then it takes off to serialize the job model and never
> comes back...
>
> Hoping someone has some idea here... the implementation for this
> mostly comes from Jackson-mapper-asl, and I have the version that
> is linked in the
> 0.14.0 tag:
> |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
> |    |    |    |    \--- org.codehaus.jackson:jackson-core-asl:1.9.13
>
> Thanks!
> Thunder
>
> -----Original Message-----
> From: Thunder Stumpges [mailto:tstump...@ntent.com]
> Sent: Friday, March 16, 2018 15:29
> To: dev@samza.apache.org; Jagadish Venkatraman
> <jagadish1...@gmail.com>
> Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan <
> nickpa...@gmail.com>
> Subject: RE: Old style "low level" Tasks with alternative
> deployment
> model(s)
>
> So, my investigation starts at StreamProcessor.java.  Line 294 in
> method
> onNewJobModel() logs an INFO message that it's starting a container.
> This message never appears.
>
> I see that ZkJobCoordinator calls onNewJobModel from its
> onNewJobModelConfirmed method which also logs an info message
> stating "version X of the job model got confirmed". I never see
> this message either, so I go up the chain some more.
>
> I DO see:
>
> 2018-03-16 21:43:58 logback 20498
> [ZkClient-EventThread-13-10.0.127.114:2181]
> INFO  o.apache.samza.zk.ZkJobCoordinator -
> ZkJobCoordinator::onBecomeLeader
> - I became the leader!
> And
> 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO
> o.apache.samza.zk.ZkJobCoordinator -
> pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
> new Job Model. Version = 1
>
> Which led me to method onDoProcessorChange line 210. I see that
> line, but not the line below " Published new Job Model. Version
> =" so something in here is not completing:
>
>     LOG.info("pid=" + processorId + "Generated new Job Model.
> Version
=
"
> + nextJMVersion);
>
>     // Publish the new job model
>     zkUtils.publishJobModel(nextJMVersion, jobModel);
>
>     // Start the barrier for the job model update
>     barrier.create(nextJMVersion, currentProcessorIds);
>
>     // Notify all processors about the new JobModel by updating
> JobModel Version number
>     zkUtils.publishJobModelVersion(currentJMVersion,
> nextJMVersion);
>
>     LOG.info("pid=" + processorId + "Published new Job Model.
> Version
=
"
> + nextJMVersion);
>
> As I mentioned, after the line "Generated new Job Model. Version = 1"
> I just get repeated zk ping responses.. no more application logging.
>
> The very next thing that's run is zkUtils.publishJobModel() which
> only has two lines before another log statement (which I don't see):
>
>   public void publishJobModel(String jobModelVersion, JobModel
jobModel) {
>     try {
>       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
> ().writeValueAsString(jobModel);
>       LOG.info("jobModelAsString=" + jobModelStr);
>       ...
>
> Could it really be getting hung up on one of these two lines?
> (seems like it must be, but I don't see anything there that seems
> like it would just hang). I'll keep troubleshooting, maybe add
> some more debug logging and try again.
>
> Thanks for any guidance you all might have.
> -Thunder
>
>
> -----Original Message-----
> From: Thunder Stumpges [mailto:tstump...@ntent.com]
> Sent: Friday, March 16, 2018 14:43
> To: dev@samza.apache.org; Jagadish Venkatraman
> <jagadish1...@gmail.com>
> Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan <
> nickpa...@gmail.com>
> Subject: RE: Old style "low level" Tasks with alternative
> deployment
> model(s)
>
> Well I have my stand-alone application in docker and running in
> kubernetes. I think something isn't wired up all the way though,
> because my task never actually gets invoked. I see no errors,
> however I'm not getting the usual startup logs (checking existing
> offsets, "entering run loop"...) My logs look like this:
>
> 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Verifying properties
> 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Property client.id is overridden to
> samza_admin-test_stream_task-1
> 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Property metadata.broker.list is overridden to
> test-kafka-kafka.test-svc:9092
> 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Property request.timeout.ms is overridden to 30000
> 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO
> kafka.client.ClientUtils$ - Fetching metadata from broker
> BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation
> id
0
> for 1 topic(s) Set(dev_k8s.samza.test.topic)
> 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG
> kafka.network.BlockingChannel - Created socket with SO_TIMEOUT =
> 30000 (requested 30000), SO_RCVBUF = 179680 (requested -1),
> SO_SNDBUF =
> 102400 (requested 102400), connectTimeoutMs = 30000.
> 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO
> kafka.producer.SyncProducer - Connected to
> test-kafka-kafka.test-svc:9092 for producing
> 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO
> kafka.producer.SyncProducer - Disconnecting from
> test-kafka-kafka.test-svc:9092
> 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG
> kafka.client.ClientUtils$ - Successfully fetched metadata for 1
> topic(s)
> Set(dev_k8s.samza.test.topic)
> 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper
> org.apache.samza.container.grouper.stream.GroupByPartition@1a7158
> cc has grouped the SystemStreamPartitions into 10 tasks with the
> following
> taskNames: [Partition 1, Partition 0, Partition 3, Partition 2,
> Partition 5, Partition 4, Partition 7, Partition 6, Partition 9,
> Partition 8]
> 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 0 is
> being assigned changelog partition 0.
> 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 1 is
> being assigned changelog partition 1.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 2 is
> being assigned changelog partition 2.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 3 is
> being assigned changelog partition 3.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 4 is
> being assigned changelog partition 4.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 5 is
> being assigned changelog partition 5.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 6 is
> being assigned changelog partition 6.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 7 is
> being assigned changelog partition 7.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 8 is
> being assigned changelog partition 8.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 9 is
> being assigned changelog partition 9.
> 2018-03-16 21:05:55 logback 50838
> [main-SendThread(10.0.127.114:2181
)]
> DEBUG org.apache.zookeeper.ClientCnxn - Reading reply
> sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null
> serverPath:null finished:false header:: 23,4  replyHeader:: 23,14024,0  
request::
> '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/
> JobModelGeneration/jobModelVersion,T  response::
> ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878}
> 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO
> o.apache.samza.zk.ZkJobCoordinator -
> pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated
> new Job Model. Version = 1
> 2018-03-16 21:06:05 logback 60848
> [main-SendThread(10.0.127.114:2181
)]
> DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
sessionid:
> 0x1622c8b5fc01ac7 after 2ms
> 2018-03-16 21:06:15 logback 70856
> [main-SendThread(10.0.127.114:2181
)]
> DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
sessionid:
> 0x1622c8b5fc01ac7 after 1ms
> 2018-03-16 21:06:25 logback 80865
> [main-SendThread(10.0.127.114:2181
)]
> DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
sessionid:
> 0x1622c8b5fc01ac7 after 2ms ...
>
> The zk ping responses continue every 10 seconds, but no other
> activity or messages occur.
> It looks like it gets as far as confirming the JobModel and
> grouping the partitions, but nothing actually starts up.
>
> Any ideas?
> Thanks in advance!
> Thunder
>
>
> -----Original Message-----
> From: Thunder Stumpges [mailto:tstump...@ntent.com]
> Sent: Thursday, March 15, 2018 16:35
> To: Jagadish Venkatraman <jagadish1...@gmail.com>
> Cc: dev@samza.apache.org; t...@recursivedream.com;
> yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
> Subject: RE: Old style "low level" Tasks with alternative
> deployment
> model(s)
>
> Thanks a lot for the info. I have something basically working at
> this point! I have not integrated it with Docker nor Kubernetes
> yet, but it does run from my local machine.
>
> I have determined that LocalApplicationRunner does NOT do config
> rewriting. I had to write my own little “StandAloneApplicationRunner”
> that handles the “main” entrypoint. It does command parsing using
> CommandLine, load config from ConfigFactory, and perform
> rewriting before creating the new instance of
> LocalApplicationRunner. This is all my StandAloneApplicationRunner contains:
>
>
> object StandAloneSamzaRunner extends App with LazyLogging {
>
>   // parse command line args just like JobRunner.
>   val cmdline = new ApplicationRunnerCommandLine
>   val options = cmdline.parser.parse(args: _*)
>   val config = cmdline.loadConfig(options)
>
>   val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
>   runner.runTask()
>   runner.waitForFinish()
> }
>
> The only config settings I needed to make to use this runner were
> (easily configured due to our central Consul config system and
> our
rewriter) :
>
> # use the ZK based job coordinator
> job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFacto
> ry # need to use GroupByContainerIds instead of
> GroupByContainerCount
> task.name.grouper.factory=org.apache.samza.container.grouper.task.
> GroupByContainerIdsFactory
> # ZKJC config
> job.coordinator.zk.connect=<our_zk_connection>
>
> I did run into one potential problem; as you see above, I have
> started the task using runTask() and then to prevent my main
> method from returning, I have called waitForFinish(). The first
> time I ran it, the job itself failed because I had forgotten to
> override the task grouper, and container count was pulled from our staging 
environment.
> There are some failures logged and it appears the JobCoordinator
> fails, but it never returns from waitForFinish. Stack trace and
continuation of log is below:
>
> 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR
> o.a.s.zk.ScheduleAfterDebounceTime
> - Execution of action: OnProcessorChange failed.
> java.lang.IllegalArgumentException: Your container count (4) is
larger
> than your task count (2). Can't have containers with nothing to
> do, so aborting.
>        at org.apache.samza.container.gro
uper.task.GroupByContainerCoun
t.
> validateTasks(GroupByContainerCount.java:212)
>        at org.apache.samza.container.grouper.task.
> GroupByContainerCount.group(GroupByContainerCount.java:62)
>        at org.apache.samza.container.grouper.task.TaskNameGrouper.
> group(TaskNameGrouper.java:56)
>        at
> org.apache.samza.coordinator.JobModelManager$.readJobModel(
> JobModelManager.scala:266)
>        at
> org.apache.samza.coordinator.JobModelManager.readJobModel(
> JobModelManager.scala)
>        at
> org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(
> ZkJobCoordinator.java:306)
>        at
> org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(
> ZkJobCoordinator.java:197)
>        at org.apache.samza.zk.ZkJobCoord
inator$LeaderElectorListenerIm
pl.
> lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
>        at org.apache.samza.zk.ScheduleAfterDebounceTime.
> lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134)
>        at java.util.concurrent.Executors$RunnableAdapter.
> call$$$capture(Executors.java:511)
>        at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java)
>        at java.util.concurrent.FutureTask.run$$$capture(
> FutureTask.java:266)
>        at java.util.concurrent.FutureTask.run(FutureTask.java)
>        at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>        at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>        at java.lang.Thread.run(Thread.java:745)
> 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> o.a.samza.processor.StreamProcessor - Container is not
> instantiated
yet.
> 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> org.I0Itec.zkclient.ZkClient - Closing ZkClient...
> 2018-03-15 22:34:32 logback 77789
> [ZkClient-EventThread-15-10.0.127.114:2181]
> INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient
> event
thread.
>
> And then the application continues on with metric reporters, and
> other debug logging (not actually running the task though)
>
> Thanks in advance for the guidance, this has been easier than I
imagined!
> I’ll report back when I get more of the Dockerization/Kubernetes
> running and test it a bit more.
> Cheers,
> Thunder
>
>
> From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com]
> Sent: Thursday, March 15, 2018 14:46
> To: Thunder Stumpges <tstump...@ntent.com>
> Cc: dev@samza.apache.org; t...@recursivedream.com;
> yi...@linkedin.com; Yi Pan <nickpa...@gmail.com>
> Subject: Re: Old style "low level" Tasks with alternative
> deployment
> model(s)
>
> >>  Thanks for the info on the tradeoffs. That makes a lot of
> >> sense. I am
> on-board with using ZkJobCoordinator, sounds like some good
> benefits over just the Kafka high-level consumer.
>
> This certainly looks like the simplest alternative.
>
> For your other questions, please find my answers inline.
>
> >> Q1: If I use LocalApplicationRunner, It does not use
> "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>
> Your understanding is correct. It directly instantiates the
> StreamProcessor, which in-turn creates and runs the SamzaContainer.
>
> >> Q2: If I use LocalApplicationRunner, I will need to code
> >> myself the
> loading and rewriting of the Config that is currently handled by
> JobRunner, correct?
>
> I don't think you'll need to do this. IIUC, the
> LocalApplicationRunner should automatically invoke rewriters and do the right 
thing.
>
> >>  Q3: Do I need to also handle coordinator stream(s) and
> >> storing of
> config that is done in JobRunner (I don’t think so as the ?
>
> I don't think this is necessary either. The creation of
> coordinator stream and persisting configuration happens in the
> LocalApplicationRunner (more specifically in
StreamManager#createStreams).
>
> >> Q4: Where/How do I specify the Container ID for each instance?
> >> Is there
> a config setting that I can pass, (or pull from an env variable
> and add to the config) ? I am assuming it is my responsibility to
> ensure that each instance is started with a unique container ID..?
>
> Nope, If you are using the ZkJobCoordinator, you need not have to
> worry about assigning IDs for each instance. The framework will
> automatically take care of generating IDs and reaching consensus
> by electing a leader. If you are curious please take a look at
> implementations of the ProcessorIdGenerator interface.
>
> Please let us know should you have further questions!
>
> Best,
> Jagdish
>
> On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges
> <tstump...@ntent.com <mailto:tstump...@ntent.com>> wrote:
>
> Thanks for the info on the tradeoffs. That makes a lot of sense.
> I am on-board with using ZkJobCoordinator, sounds like some good
> benefits over just the Kafka high-level consumer.
>
>
>
> To that end, I have made some notes on possible approaches based
> on the previous thread, and from my look into the code. I’d love
> to get
feedback.
>
>
>
> Approach 1. Configure jobs to use “ProcessJobFactory” and run
> instances of the job using run-job.sh or using JobRunner directly.
>
> I don’t think this makes sense from what I can see for a few reasons:
>
>   *   JobRunner is concerned with stuff I don't *think* we need:
>
>      *   coordinatorSystemProducer|Consumer,
>      *   writing/reading the configuration to the coordinator streams
>
>   *   ProcessJobFactory hard-codes the ID to “0” so I don’t think that
> will work for multiple instances.
>
>
>
> Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and
> invoke
> LocalApplicationRunner.runTask()
>
>
>
>     Q1: If I use LocalApplicationRunner, It does not use
> "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>
>     Q2: If I use LocalApplicationRunner, I will need to code
> myself the loading and rewriting of the Config that is currently
> handled by JobRunner, correct?
>
>     Q3: Do I need to also handle coordinator stream(s) and
> storing of config that is done in JobRunner (I don’t think so as the ?
>
>     Q4: Where/How do I specify the Container ID for each
> instance? Is there a config setting that I can pass, (or pull
> from an env variable and add to the config) ? I am assuming it is
> my responsibility to ensure that each instance is started with a unique 
container ID..?
>
> I am getting started on the above (Approach 2.), and looking
> closer at the code so I may have my own answers to my questions,
> but figured I should go ahead and ask now anyway. Thanks!
>
> -Thunder
>
>
> From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com<mailto:
> jagadish1...@gmail.com>]
> Sent: Thursday, March 15, 2018 1:41
> To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder
> Stumpges < tstump...@ntent.com<mailto:tstump...@ntent.com>>;
> t...@recursivedream.com <mailto:t...@recursivedream.com>
> Cc: yi...@linkedin.com<mailto:yi...@linkedin.com>; Yi Pan <
> nickpa...@gmail.com<mailto:nickpa...@gmail.com>>
>
> Subject: Re: Old style "low level" Tasks with alternative
> deployment
> model(s)
>
> >> You are correct that this is focused on the higher-level API
> >> but doesn't
> preclude using the lower-level API. I was at the same point you
> were not long ago, in fact, and had a very productive
> conversation on the list
>
> Thanks Tom for linking the thread, and I'm glad that you were
> able to get Kubernetes integration working with Samza.
>
> >> If it is helpful for everyone, once I get the low-level API +
> >> ZkJobCoordinator + Docker +
> K8s working, I'd be glad to formulate an additional sample for
hello-samza.
>
> @Thunder Stumpges:
> We'd be thrilled to receive your contribution. Examples, demos,
> tutorials etc.
> contribute a great deal to improving the ease of use of Apache Samza.
> I'm happy to shepherd design discussions/code-reviews in the
> open-source including answering any questions you may have.
>
>
> >> One thing I'm still curious about, is what are the drawbacks
> >> or complexities of leveraging the Kafka High-level consumer +
> >> PassthroughJobCoordinator in a stand-alone setup like this? We
> >> do have Zookeeper (because of kafka) so I think either would
> >> work. The Kafka High-level consumer comes with other nice
> >> tools for monitoring offsets, lag, etc
>
>
> @Thunder Stumpges:
>
> Samza uses a "Job-Coordinator" to assign your input-partitions
> among the different instances of your application s.t. they don't
> overlap. A typical way to solve this "partition distribution"
> problem is to have a single instance elected as a "leader" and
> have the leader assign partitions to the group.
> The ZkJobCoordinator uses Zk primitives to achieve this, while
> the YarnJC relies on Yarn's guarantee that there will be a
> singleton-AppMaster to achieve this.
>
> A key difference that separates the PassthroughJC from the
> Yarn/Zk variants is that it does _not_ attempt to solve the
> "partition distribution" problem. As a result, there's no
> leader-election
involved.
> Instead, it pushes the problem of "partition distribution" to the
> underlying consumer.
>
> The PassThroughJc supports these 2 scenarios:
>
> 1. Consumer-managed partition distribution: When using the Kafka
> high-level consumer (or an AWS KinesisClientLibrary consumer)
> with Samza, the consumer manages partitions internally.
>
> 2. Static partition distribution: Alternately, partitions can be
> managed statically using configuration. You can achieve static
> partition assignment by implementing a custom
> SystemStreamPartitionGrouper<h
> ttps://samza.apache.org/learn/documentation/0.8/api/
> javadocs/org/apache/samza/container/grouper/stream/
> SystemStreamPartitionGrouper.html> and TaskNameGrouper<https://
> github.com/apache/samza/blob/master/samza-core/src/main/
> java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>.
> Solutions in this category will typically require you to
> distinguish the various processors in the group by providing an "id" for each.
> Once the "id"s are decided, you can then statically compute
> assignments using a function (eg: modulo N).
> You can rely on the following mechanisms to provide this id:
>  - Configure each instance differently to have its own id
>  - Obtain the id from the cluster-manager. For instance,
> Kubernetes will provide each POD an unique id in the range [0,N).
> AWS ECS should expose similar capabilities via a REST end-point.
>
> >> One thing I'm still curious about, is what are the drawbacks
> >> or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this?
>
> Leveraging the Kafka High-level consumer:
>
> The Kafka high-level consumer is not integrated into Samza just yet.
> Instead, Samza's integration with Kafka uses the low-level
> consumer because
> i) It allows for greater control in fetching data from individual
brokers.
> It is simple and performant in-terms of the threading model to
> have one-thread pull from each broker.
> ii) It is efficient in memory utilization since it does not do
> internal-buffering of messages.
> iii) There's no overhead like Kafka-controller heart-beats that
> are driven by consumer.poll
>
> Since there's no built-in integration, you will have to build a
> new SystemConsumer if you need to integrate with the Kafka
> High-level
consumer.
> Further, there's more a fair bit of complexity to manage in
checkpointing.
>
> >> The Kafka High-level consumer comes with other nice tools for
> >> monitoring offsets, lag, etc
>
> Samza exposes<https://github.com/apache/samza/blob/master/
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/
> KafkaSystemConsumerMetrics.scala> the below metrics for
lag-monitoring:
> - The current log-end offset for each partition
> - The last check-pointed offset for each partition
> - The number of messages behind the highwatermark of the
> partition
>
> Please let us know if you need help discovering these or
> integrating these with other systems/tools.
>
>
> Leveraging the Passthrough JobCoordinator:
>
> It's helpful to split this discussion on tradeoffs with
> PassthroughJC into
> 2 parts:
>
> 1. PassthroughJC + consumer managed partitions:
>
> - In this model, Samza has no control over partition-assignment
> since it's managed by the consumer. This means that stateful
> operations like joins that rely on partitions being co-located on
> the same task will
not work.
> Simple stateless operations (eg: map, filter, remote lookups) are
fine.
>
> - A key differentiator between Samza and other frameworks is our
> support for "host
> affinity<https://samza.apache.org/learn/documentation/0.14/
> yarn/yarn-host-affinity.html>". Samza achieves this by assigning
> partitions to hosts taking data-locality into account. If the
> consumer can arbitrarily shuffle partitions, it'd be hard to
> support this affinity/locality. Often this is a key optimization
> when dealing with large stateful jobs.
>
> 2. PassthroughJC + static partitions:
>
> - In this model, it is possible to make stateful processing
> (including host affinity) work by carefully choosing how "id"s
> are assigned and computed.
>
> Recommendation:
>
> - Owing to the above subtleties, I would recommend that we give
> the ZkJobCoordinator + the built-in low-level Kafka integration a try.
> - If we hit snags down this path, we can certainly explore the
> approach with PassthroughJC + static partitions.
> - Using the PassthroughJC + consumer-managed distribution would
> be least preferable owing to the subtleties I outlined above.
>
> Please let us know should you have more questions.
>
> Best,
> Jagdish
>
> On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <
tstump...@ntent.com
> <mailto:tstump...@ntent.com>> wrote:
> Wow, what great timing, and what a great thread! I definitely
> have some good starters to go off of here.
>
> If it is helpful for everyone, once I get the low-level API +
> ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate
> an additional sample for hello-samza.
>
> One thing I'm still curious about, is what are the drawbacks or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this? We do
> have Zookeeper (because of kafka) so I think either would work.
> The Kafka High-level consumer comes with other nice tools for
> monitoring offsets, lag, etc....
>
> Thanks guys!
> -Thunder
>
> -----Original Message-----
> From: Tom Davis [mailto:t...@recursivedream.com<mailto:
> t...@recursivedream.com>]
> Sent: Wednesday, March 14, 2018 17:50
> To: dev@samza.apache.org<mailto:dev@samza.apache.org>
> Subject: Re: Old style "low level" Tasks with alternative
> deployment
> model(s)
>
> Hey there!
>
> You are correct that this is focused on the higher-level API but
> doesn't preclude using the lower-level API. I was at the same
> point you were not long ago, in fact, and had a very productive
> conversation
on the list:
> you should look for "Question about custom StreamJob/Factory" in
> the list archive for the past couple months.
>
> I'll quote Jagadish Venkatraman from that thread:
>
> > For the section on the low-level API, can you use
> > LocalApplicationRunner#runTask()? It basically creates a new
> > StreamProcessor and runs it. Remember to provide task.class and
> > set it to your implementation of StreamTask or AsyncStreamTask.
> > Please note that this is an evolving API and hence, subject to change.
>
> I ended up just switching to the high-level API because I don't
> have any existing Tasks and the Kubernetes story is a little more
> straight forward there (there's only one container/configuration to deploy).
>
> Best,
>
> Tom
>
> Thunder Stumpges
> <tstump...@ntent.com<mailto:tstump...@ntent.com>>
writes:
>
> > Hi all,
> >
> > We are using Samza (0.12.0) in about 2 dozen jobs implementing
> > several processing pipelines. We have also begun a significant
> > move of other services within our company to Docker/Kubernetes.
> > Right now our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce"
> > jobs
> (many reporting and other batch processing jobs). We would really
> like to move our stream processing off of Hadoop/Yarn and onto Kubernetes.
> >
> > When I just read about some of the new progress in .13 and .14
> > I got really excited! We would love to have our jobs run as
> > simple libraries in our own JVM, and use the Kafka
> > High-Level-Consumer for partition
> distribution and such. This would let us "dockerfy" our
> application and run/scale in kubernetes.
> >
> > However as I read it, this new deployment model is ONLY for the
> > new(er) High Level API, correct? Is there a plan and/or
> > resources for adapting this back to existing low-level tasks ?
> > How complicated of a
> task is that? Do I have any other options to make this transition
easier?
> >
> > Thanks in advance.
> > Thunder
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>





Reply via email to